Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B4DEE200D26 for ; Fri, 6 Oct 2017 01:45:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B31B11609E2; Thu, 5 Oct 2017 23:45:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 641BC160BDA for ; Fri, 6 Oct 2017 01:45:44 +0200 (CEST) Received: (qmail 72300 invoked by uid 500); 5 Oct 2017 23:45:43 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 72291 invoked by uid 99); 5 Oct 2017 23:45:43 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Oct 2017 23:45:43 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id C7A1981A88; Thu, 5 Oct 2017 23:45:40 +0000 (UTC) Date: Thu, 05 Oct 2017 23:45:40 +0000 To: "commits@geode.apache.org" Subject: [geode] 01/01: GEODE-3679 Forward client member id to other peers in transaction message. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: eshu11@apache.org In-Reply-To: <150724713979.19874.7234235345746034974@gitbox.apache.org> References: <150724713979.19874.7234235345746034974@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/feature/GEODE-3679 X-Git-Reftype: branch X-Git-Rev: c34132f3f1cf99163ef4b0dcb1055fd0a73525ed X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20171005234541.C7A1981A88@gitbox.apache.org> archived-at: Thu, 05 Oct 2017 23:45:45 -0000 This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-3679 in repository https://gitbox.apache.org/repos/asf/geode.git commit c34132f3f1cf99163ef4b0dcb1055fd0a73525ed Author: eshu AuthorDate: Mon Oct 2 11:22:32 2017 -0700 GEODE-3679 Forward client member id to other peers in transaction message. * Do not forward size request of a bucket region to transaction hosting node. * Move the method from LocalRegion to DistributedRegion. * Make region in TXRegionStub strong type to avoid some casting in the code. --- .../apache/geode/internal/cache/BucketRegion.java | 6 + .../geode/internal/cache/DistributedRegion.java | 27 ++++ .../apache/geode/internal/cache/LocalRegion.java | 74 ++++----- .../geode/internal/cache/PeerTXStateStub.java | 4 +- .../cache/partitioned/PartitionMessage.java | 7 +- .../cache/tx/AbstractPeerTXRegionStub.java | 4 +- .../internal/cache/tx/DistributedTXRegionStub.java | 21 +-- .../internal/cache/tx/PartitionedTXRegionStub.java | 31 ++-- .../cache/ClientServerTransactionDUnitTest.java | 167 +++++++++++++++++++++ .../cache/tx/AbstractPeerTXRegionStubTest.java | 2 +- 10 files changed, 265 insertions(+), 78 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index c76e19e..cc84b9f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -1392,6 +1392,12 @@ public class BucketRegion extends DistributedRegion implements Bucket { } @Override + public int getRegionSize(DistributedMember target) { + // GEODE-3679. Do not forward the request again. + return getRegionSize(); + } + + @Override public void checkReadiness() { super.checkReadiness(); if (isDestroyed()) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index e882ed1..20d9f15 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -61,6 +61,8 @@ import org.apache.geode.cache.RegionMembershipListener; import org.apache.geode.cache.ResumptionAction; import org.apache.geode.cache.RoleException; import org.apache.geode.cache.TimeoutException; +import org.apache.geode.cache.TransactionDataNotColocatedException; +import org.apache.geode.cache.TransactionException; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; @@ -101,6 +103,7 @@ import org.apache.geode.internal.cache.execute.LocalResultCollector; import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.lru.LRUEntry; +import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage; import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor; import org.apache.geode.internal.cache.persistence.PersistenceAdvisor; import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl; @@ -3553,6 +3556,30 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA return super.getOwnerForKey(key); } + + /** + * Returns the size in this region. + * + * This is used in a transaction to find the size of the region on the transaction hosting node. + * + * @param target the host of the transaction TXState + * @return the number of entries in this region + */ + public int getRegionSize(DistributedMember target) { + try { + RemoteSizeMessage.SizeResponse response = + RemoteSizeMessage.send(Collections.singleton(target), this); + return response.waitForSize(); + } catch (RegionDestroyedException rde) { + throw new TransactionDataNotColocatedException( + LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION + .toLocalizedString(rde.getRegionFullPath()), + rde); + } catch (Exception e) { + throw new TransactionException(e); + } + } + /** * Execute the provided named function in all locations that contain the given keys. So function * can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 47ce7e0..bc384c2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -17,6 +17,43 @@ package org.apache.geode.internal.cache; import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator; import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.AbstractSet; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.Transaction; + import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.CopyHelper; @@ -183,43 +220,6 @@ import org.apache.geode.pdx.JSONFormatter; import org.apache.geode.pdx.PdxInstance; import org.apache.logging.log4j.Logger; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.AbstractSet; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.transaction.RollbackException; -import javax.transaction.Status; -import javax.transaction.SystemException; -import javax.transaction.Transaction; - /** * Implementation of a local scoped-region. Note that this class has a different meaning starting * with 3.0. In previous versions, a LocalRegion was the representation of a region in the VM. diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java index 8e9ea89..80a2fc6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java @@ -169,14 +169,14 @@ public class PeerTXStateStub extends TXStateStub { TXRegionStub stub = null; if (region.getPartitionAttributes() != null) { // a partitioned region - stub = new PartitionedTXRegionStub(this, region); + stub = new PartitionedTXRegionStub(this, (PartitionedRegion) region); } else if (region.getScope().isLocal()) { // GEODE-3744 Local region should not be involved in a transaction on a PeerTXStateStub throw new TransactionException( "Local region " + region + " should not participate in a transaction not hosted locally"); } else { // This is a dist region - stub = new DistributedTXRegionStub(this, region); + stub = new DistributedTXRegionStub(this, (DistributedRegion) region); } return stub; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java index 8c27107..64bebc4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java @@ -166,9 +166,10 @@ public abstract class PartitionMessage extends DistributionMessage "Sending remote txId even though transaction is local. This should never happen: txState=" + txState); } - } - if (txState != null && txState.isMemberIdForwardingRequired()) { - this.txMemberId = txState.getOriginatingMember(); + // GEODE-3679. Even if TXStateProxy has a local transaction, + // we still need to forward original txMemberId to other nodes + // if the message does not start a new transaction. + this.txMemberId = txState.getTxId().getMemberId(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java index b4a2998..78273c5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java @@ -29,11 +29,9 @@ import org.apache.geode.internal.i18n.LocalizedStrings; public abstract class AbstractPeerTXRegionStub implements TXRegionStub { protected final TXStateStub state; - protected final LocalRegion region; - public AbstractPeerTXRegionStub(TXStateStub txstate, LocalRegion r) { + public AbstractPeerTXRegionStub(TXStateStub txstate) { this.state = txstate; - this.region = r; } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java index 384135d..17bd83b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java @@ -26,6 +26,7 @@ import org.apache.geode.cache.Region.Entry; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.DistributedPutAllOperation; +import org.apache.geode.internal.cache.DistributedRegion; import org.apache.geode.internal.cache.DistributedRemoveAllOperation; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.KeyInfo; @@ -45,17 +46,16 @@ import org.apache.geode.internal.cache.RemoteContainsKeyValueMessage.RemoteConta import org.apache.geode.internal.cache.RemoteOperationMessage.RemoteOperationResponse; import org.apache.geode.internal.cache.RemotePutMessage.PutResult; import org.apache.geode.internal.cache.RemotePutMessage.RemotePutResponse; -import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage; import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; import org.apache.geode.internal.i18n.LocalizedStrings; public class DistributedTXRegionStub extends AbstractPeerTXRegionStub { - private final LocalRegion region; + private final DistributedRegion region; - public DistributedTXRegionStub(TXStateStub txstate, LocalRegion r) { - super(txstate, r); + public DistributedTXRegionStub(TXStateStub txstate, DistributedRegion r) { + super(txstate); this.region = r; } @@ -224,18 +224,7 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub { public int entryCount() { - try { - RemoteSizeMessage.SizeResponse response = - RemoteSizeMessage.send(Collections.singleton(state.getTarget()), region); - return response.waitForSize(); - } catch (RegionDestroyedException rde) { - throw new TransactionDataNotColocatedException( - LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION - .toLocalizedString(rde.getRegionFullPath()), - rde); - } catch (Exception e) { - throw new TransactionException(e); - } + return this.region.getRegionSize(this.state.getTarget()); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java index 10ae7a5..5673c68 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java @@ -63,8 +63,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { */ private Map buckets = new HashMap(); - public PartitionedTXRegionStub(TXStateStub txstate, LocalRegion r) { - super(txstate, r); + private final PartitionedRegion region; + + public PartitionedTXRegionStub(TXStateStub txstate, PartitionedRegion r) { + super(txstate); + this.region = r; } public Map getBuckets() { @@ -136,7 +139,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) { Map, TXRegionStub> regionStubs = this.state.getRegionStubs(); Collection colcatedRegions = (Collection) ColocationHelper - .getAllColocationRegions((PartitionedRegion) this.region).values(); + .getAllColocationRegions(this.region).values(); // get all colocated region buckets touched in the transaction for (PartitionedRegion colcatedRegion : colcatedRegions) { PartitionedTXRegionStub regionStub = @@ -160,9 +163,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { public Entry getEntry(KeyInfo keyInfo, boolean allowTombstones) { - PartitionedRegion pr = (PartitionedRegion) region; try { - Entry e = pr.getEntryRemotely((InternalDistributedMember) state.getTarget(), + Entry e = region.getEntryRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey(), false, allowTombstones); trackBucketForTx(keyInfo); return e; @@ -238,9 +240,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { public boolean containsKey(KeyInfo keyInfo) { - PartitionedRegion pr = (PartitionedRegion) region; try { - boolean retVal = pr.containsKeyRemotely((InternalDistributedMember) state.getTarget(), + boolean retVal = region.containsKeyRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey()); trackBucketForTx(keyInfo); return retVal; @@ -282,10 +283,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { public boolean containsValueForKey(KeyInfo keyInfo) { - PartitionedRegion pr = (PartitionedRegion) region; try { - boolean retVal = pr.containsValueForKeyRemotely((InternalDistributedMember) state.getTarget(), - keyInfo.getBucketId(), keyInfo.getKey()); + boolean retVal = region.containsValueForKeyRemotely( + (InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey()); trackBucketForTx(keyInfo); return retVal; } catch (TransactionException e) { @@ -318,10 +318,10 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { Object retVal = null; final Object key = keyInfo.getKey(); final Object callbackArgument = keyInfo.getCallbackArg(); - PartitionedRegion pr = (PartitionedRegion) region; try { - retVal = pr.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), - key, callbackArgument, peferCD, requestingClient, clientEvent, false); + retVal = + region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), + key, callbackArgument, peferCD, requestingClient, clientEvent, false); } catch (TransactionException e) { RuntimeException re = getTransactionException(keyInfo, e); re.initCause(e.getCause()); @@ -347,12 +347,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub { public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstones) { - PartitionedRegion pr = (PartitionedRegion) region; - InternalDistributedMember primary = pr.getBucketPrimary(keyInfo.getBucketId()); + InternalDistributedMember primary = region.getBucketPrimary(keyInfo.getBucketId()); if (primary.equals(state.getTarget())) { return getEntry(keyInfo, allowTombstones); } else { - return pr.getSharedDataView().getEntry(keyInfo, pr, allowTombstones); + return region.getSharedDataView().getEntry(keyInfo, region, allowTombstones); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java index 96b89b9..c7ae750 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java @@ -4065,4 +4065,171 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest Assert.fail("Unexpected exception while doing JTA Transaction2 ", e); } } + + @Test + public void testPartitionMessageSetsClientMemberIdAsTxMemberId() { + Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM client = host.getVM(2); + int totalBuckets = 50; + String regionName = "region"; + + setupRegionForClientTransactions(totalBuckets, regionName, false, null); + + client.invokeAsync(() -> doKeySetOpTransaction(1, regionName, totalBuckets, false, null)); + + // Should cause TXId(server1, 1) to be executed on server2 + server1.invoke(() -> doPutOpTransaction(regionName, totalBuckets)); + } + + private void doKeySetOpTransaction(int firstGetKey, String regionName, int totalBuckets, + boolean withReplicateRegion, String region2Name) { + Region region = getCache().getRegion(regionName); + TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager(); + txMgr.begin(); + region.get(firstGetKey); // starts TXState on a server with the primary bucket of the key + verifyKeySetOp(totalBuckets, region); + + if (withReplicateRegion) { + Region region2 = getCache().getRegion(region2Name); + int num = totalBuckets + 1; + region2.put(num, "" + num); + verifyKeySetOp(num, region2); + } + txMgr.rollback(); + } + + private void verifyKeySetOp(int expected, Region region) { + Set keys = region.keySet(); + assertEquals(expected, keys.size()); + for (Integer key : keys) { + assertTrue(key <= expected); + } + } + + private void doPutOpTransaction(String regionName, int totalBuckets) throws InterruptedException { + TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager(); + Region region = getCache().getRegion(regionName); + txMgr.begin(); + region.put(2, "NEWVALUE"); + Thread.currentThread().sleep(100); + txMgr.commit(); + } + + private void doSizeOpTransactions(String regionName, int totalBuckets, String region2Name) { + for (int i = 1; i <= totalBuckets; i++) { + doSizeOpTransaction(i, regionName, totalBuckets, region2Name); + } + } + + private void doSizeOpTransaction(int key, String regionName, int totalBuckets, + String region2Name) { + Region region = getCache().getRegion(regionName); + Region region2 = getCache().getRegion(region2Name); + TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager(); + txMgr.begin(); + region.get(key); // starts TXState on different servers + assertEquals(totalBuckets, region.size()); + int num = totalBuckets + 1; + region2.put(num, "" + num); + assertEquals(num, region2.size()); + txMgr.rollback(); + } + + @Test + public void testSizeOpInTransaction() { + Host host = Host.getHost(0); + VM client = host.getVM(2); + String regionName = "region"; + String region2Name = "region2"; + int totalBuckets = 2; + setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name); + + client.invoke(() -> doSizeOpTransactions(regionName, totalBuckets, region2Name)); + } + + private void setupRegionForClientTransactions(int totalBuckets, String regionName, + boolean withReplicateRegion, String region2Name) { + Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + int port = createRegionsAndStartServer(server1, true); + + createPRAndInitABucketOnServer1(totalBuckets, regionName, server1); + + createPRAndInitOtherBucketsOnServer2(totalBuckets, regionName, server2); + + if (withReplicateRegion) { + initReplicateRegion(totalBuckets, region2Name, server1, server2); + } + + createRegionOnClient(regionName, withReplicateRegion, region2Name, client, port); + } + + private void createRegionOnClient(String regionName, boolean withReplicateRegion, + String region2Name, VM client, int port) { + client.invoke(() -> { + createClient(port, regionName); + if (withReplicateRegion) { + createClient(port, region2Name); + } + }); + } + + private void initReplicateRegion(int totalBuckets, String region2Name, VM server1, VM server2) { + server1.invoke(() -> createReplicateRegion(region2Name)); + server2.invoke(() -> { + createReplicateRegion(region2Name); + Region region = getCache().getRegion(region2Name); + for (int i = totalBuckets; i > 0; i--) { + region.put(i, "" + i); + } + }); + } + + private void createPRAndInitOtherBucketsOnServer2(int totalBuckets, String regionName, + VM server2) { + createRegionOnServer(server2); + server2.invoke(() -> { + createSubscriptionRegion(false, regionName, 0, totalBuckets); + Region region = getCache().getRegion(regionName); + for (int i = totalBuckets; i > 1; i--) { + region.put(i, "VALUE-" + i); + } + }); + } + + private void createPRAndInitABucketOnServer1(int totalBuckets, String regionName, VM server1) { + server1.invoke(() -> { + createSubscriptionRegion(false, regionName, 0, totalBuckets); + Region region = getCache().getRegion(regionName); + // should create first bucket on server1 + region.put(1, "VALUE-1"); + }); + } + + @Test + public void testKeySetOpInTransaction() { + Host host = Host.getHost(0); + VM client = host.getVM(2); + String regionName = "region"; + String region2Name = "region2"; + int totalBuckets = 2; + setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name); + + client.invoke(() -> doKeySetOpTransactions(regionName, totalBuckets, true, region2Name)); + } + + private void doKeySetOpTransactions(String regionName, int totalBuckets, + boolean withReplicateRegion, String region2Name) { + for (int i = 1; i <= totalBuckets; i++) { + doKeySetOpTransaction(i, regionName, totalBuckets, withReplicateRegion, region2Name); + } + } + + private void createReplicateRegion(String regionName) { + RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE); + Region region = rf.create(regionName); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java index 80888ef..a7240bb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java @@ -51,7 +51,7 @@ public class AbstractPeerTXRegionStubTest { private class TestingAbstractPeerTXRegionStub extends AbstractPeerTXRegionStub { private TestingAbstractPeerTXRegionStub(TXStateStub txState, LocalRegion r) { - super(txState, r); + super(txState); } @Override -- To stop receiving notification emails like this one, please contact "commits@geode.apache.org" .