Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D06D4200B33 for ; Wed, 15 Jun 2016 02:07:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CF0D3160A62; Wed, 15 Jun 2016 00:07:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C68CC160A06 for ; Wed, 15 Jun 2016 02:07:28 +0200 (CEST) Received: (qmail 85319 invoked by uid 500); 15 Jun 2016 00:07:28 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 85310 invoked by uid 99); 15 Jun 2016 00:07:27 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 00:07:27 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 8B41FC214D for ; Wed, 15 Jun 2016 00:07:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id esw-oIFr0zLd for ; Wed, 15 Jun 2016 00:07:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 1A8A760E1C for ; Wed, 15 Jun 2016 00:07:21 +0000 (UTC) Received: (qmail 83246 invoked by uid 99); 15 Jun 2016 00:07:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jun 2016 00:07:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40069E2C1A; Wed, 15 Jun 2016 00:07:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hiteshkhamesra@apache.org To: commits@geode.incubator.apache.org Date: Wed, 15 Jun 2016 00:08:40 -0000 Message-Id: In-Reply-To: <60f2027f4d98435cba2ee6ef29b9a036@git.apache.org> References: <60f2027f4d98435cba2ee6ef29b9a036@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [84/94] [abbrv] incubator-geode git commit: GEODE-1517: fix TXStateProxyImpl continue working after TXManagerImpl is closed. archived-at: Wed, 15 Jun 2016 00:07:30 -0000 GEODE-1517: fix TXStateProxyImpl continue working after TXManagerImpl is closed. Holding the ReentrantLock during TXManagerImpl.close() when closing TXStateProxyImpl. Provent any TXStateProxyImpl to work further. Add a test case where it fails without the above fix and passes after the fix. Also avoid going through synchronized failover map to check if a TXStateProxyImpl is finished, instead, use its isInProgress() call. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6f70cd70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6f70cd70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6f70cd70 Branch: refs/heads/feature/GEODE-1372 Commit: 6f70cd703139417388fbd9845dd8aefb3b3f7c30 Parents: d07c966 Author: eshu Authored: Mon Jun 13 16:44:55 2016 -0700 Committer: eshu Committed: Mon Jun 13 16:44:55 2016 -0700 ---------------------------------------------------------------------- .../internal/cache/RemoteOperationMessage.java | 10 ++- .../gemfire/internal/cache/TXManagerImpl.java | 32 ++++----- .../cache/partitioned/PartitionMessage.java | 10 ++- .../cache/RemoteOperationMessageTest.java | 31 +++++++-- .../internal/cache/TXManagerImplTest.java | 73 +++++++++++++++----- .../cache/partitioned/PartitionMessageTest.java | 27 ++++++-- 6 files changed, 120 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java index 19e1dea..db5bcca 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java @@ -241,8 +241,10 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme sendReply = operateOnRegion(dm, r, startTime); } else { try { - TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId()); - if (!hasTxAlreadyFinished(tx, txMgr, txid)) { + if (txMgr.isClosed()) { + // NO DISTRIBUTED MESSAGING CAN BE DONE HERE! + sendReply = false; + } else if (tx.isInProgress()) { sendReply = operateOnRegion(dm, r, startTime); } } finally { @@ -317,10 +319,6 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme } } - boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) { - return txMgr.hasTxAlreadyFinished(tx, txid); - } - TXManagerImpl getTXManager(GemFireCacheImpl cache) { return cache.getTxManager(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java index 2608878..1ea7f71 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java @@ -90,7 +90,7 @@ public class TXManagerImpl implements CacheTransactionManager, private final ArrayList txListeners = new ArrayList(8); public TransactionWriter writer = null; - private boolean closed = false; + private volatile boolean closed = false; private final Map hostedTXStates; @@ -577,7 +577,12 @@ public class TXManagerImpl implements CacheTransactionManager, } this.closed = true; for (TXStateProxy proxy: this.hostedTXStates.values()) { - proxy.close(); + proxy.getLock().lock(); + try { + proxy.close(); + } finally { + proxy.getLock().unlock(); + } } for (TXStateProxy proxy: this.localTxMap.values()) { proxy.close(); @@ -646,7 +651,7 @@ public class TXManagerImpl implements CacheTransactionManager, } } - private final boolean isClosed() { + public final boolean isClosed() { return this.closed; } private final void checkClosed() { @@ -752,10 +757,13 @@ public class TXManagerImpl implements CacheTransactionManager, // Inflight op could be received later than TXFailover operation. if (curVal == null) { if (!isHostedTxRecentlyCompleted(key)) { - this.hostedTXStates.put(key, val); // Failover op removed the val // It is possible that the same operation can be executed // twice by two threads, but data is consistent. + this.hostedTXStates.put(key, val); + } else { + //Another thread should complete the transaction + logger.info("{} has already finished." , val.getTxId()); } } else { if (val != curVal) { @@ -770,22 +778,6 @@ public class TXManagerImpl implements CacheTransactionManager, return true; } - public boolean hasTxAlreadyFinished(TXStateProxy tx, TXId txid) { - if (tx == null) { - return false; - } - if (isHostedTxRecentlyCompleted(txid)) { - //Should only happen when handling a later arrival of transactional op from proxy, - //while the transaction has failed over and already committed or rolled back. - //Just send back reply as a success op. - //The client connection should be lost from proxy, or - //the proxy is closed for failover to occur. - logger.info("TxId {} has already finished." , txid); - return true; - } - return false; - } - /** * Associate the remote txState with the thread processing this message. Also, * we acquire a lock on the txState, on which this thread operates. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java index 1b83ee3..14fce08 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java @@ -278,10 +278,6 @@ public abstract class PartitionMessage extends DistributionMessage implements return (ds == null || ds.isDisconnecting()); } - boolean hasTxAlreadyFinished(TXStateProxy tx, TXManagerImpl txMgr, TXId txid) { - return txMgr.hasTxAlreadyFinished(tx, txid); - } - PartitionedRegion getPartitionedRegion() throws PRLocallyDestroyedException { return PartitionedRegion.getPRFromId(this.regionId); } @@ -343,8 +339,10 @@ public abstract class PartitionMessage extends DistributionMessage implements sendReply = operateOnPartitionedRegion(dm, pr, startTime); } else { try { - TXId txid = new TXId(getMemberToMasqueradeAs(), getTXUniqId()); - if (!hasTxAlreadyFinished(tx, txMgr, txid)) { + if (txMgr.isClosed()) { + // NO DISTRIBUTED MESSAGING CAN BE DONE HERE! + sendReply = false; + } else if (tx.isInProgress()) { sendReply = operateOnPartitionedRegion(dm, pr, startTime); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java index ecfc2b0..18ce2a9 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessageTest.java @@ -25,7 +25,6 @@ import org.mockito.internal.stubbing.answers.CallsRealMethods; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.TXId; import com.gemstone.gemfire.internal.cache.TXManagerImpl; import com.gemstone.gemfire.internal.cache.TXStateProxy; import com.gemstone.gemfire.internal.cache.TXStateProxyImpl; @@ -40,7 +39,6 @@ public class RemoteOperationMessageTest { private DistributionManager dm; private LocalRegion r; private TXManagerImpl txMgr; - private TXId txid; private long startTime = 0; TXStateProxy tx; @@ -51,7 +49,6 @@ public class RemoteOperationMessageTest { msg = mock(RemoteOperationMessage.class); r = mock(LocalRegion.class); txMgr = mock(TXManagerImpl.class); - txid = new TXId(null, 0); tx = mock(TXStateProxyImpl.class); when(msg.checkCacheClosing(dm)).thenReturn(false); @@ -59,9 +56,8 @@ public class RemoteOperationMessageTest { when(msg.getCache(dm)).thenReturn(cache); when(msg.getRegionByPath(cache)).thenReturn(r); when(msg.getTXManager(cache)).thenReturn(txMgr); - when(txMgr.hasTxAlreadyFinished(tx, txid)).thenCallRealMethod(); - doAnswer(new CallsRealMethods()).when(msg).process(dm); + doAnswer(new CallsRealMethods()).when(msg).process(dm); } @Test @@ -75,7 +71,7 @@ public class RemoteOperationMessageTest { @Test public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, RemoteOperationException { when(txMgr.masqueradeAs(msg)).thenReturn(tx); - when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenCallRealMethod(); + when(tx.isInProgress()).thenReturn(true); msg.process(dm); verify(msg, times(1)).operateOnRegion(dm, r, startTime); @@ -84,10 +80,31 @@ public class RemoteOperationMessageTest { @Test public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, RemoteOperationException { when(txMgr.masqueradeAs(msg)).thenReturn(tx); - when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); + when(tx.isInProgress()).thenReturn(false); msg.process(dm); verify(msg, times(0)).operateOnRegion(dm, r, startTime); } + @Test + public void noNewTxProcessingAfterTXManagerImplClosed() throws RemoteOperationException { + txMgr = new TXManagerImpl(null, cache); + + when(msg.checkCacheClosing(dm)).thenReturn(false); + when(msg.checkDSClosing(dm)).thenReturn(false); + when(msg.getCache(dm)).thenReturn(cache); + when(msg.getRegionByPath(cache)).thenReturn(r); + when(msg.getTXManager(cache)).thenReturn(txMgr); + + when(msg.canParticipateInTransaction()).thenReturn(true); + when(msg.canStartRemoteTransaction()).thenReturn(true); + + msg.process(dm); + + txMgr.close(); + + msg.process(dm); + + verify(msg, times(1)).operateOnRegion(dm, r, startTime); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java index ce24947..ae4f378 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TXManagerImplTest.java @@ -49,6 +49,7 @@ public class TXManagerImplTest { TXStateProxy tx1, tx2; DistributionManager dm; TXRemoteRollbackMessage rollbackMsg; + TXRemoteCommitMessage commitMsg; @Before public void setUp() { @@ -63,9 +64,11 @@ public class TXManagerImplTest { notCompletedTxid = new TXId(member, 2); latch = new CountDownLatch(1); rollbackMsg = new TXRemoteRollbackMessage(); + commitMsg = new TXRemoteCommitMessage(); when(this.msg.canStartRemoteTransaction()).thenReturn(true); when(this.msg.canParticipateInTransaction()).thenReturn(true); + } @Test @@ -142,9 +145,9 @@ public class TXManagerImplTest { assertFalse(txMgr.getLock(tx, txid)); } - + @Test - public void getLockAfterTXStateCommitted() throws InterruptedException{ + public void getLockAfterTXStateCommitted() throws InterruptedException{ TXStateProxy oldtx = txMgr.getOrSetHostedTXState(txid, msg); assertEquals(oldtx, txMgr.getHostedTXState(txid)); @@ -157,6 +160,20 @@ public class TXManagerImplTest { Thread t1 = new Thread(new Runnable() { public void run() { + when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class)); + TXStateProxy tx; + try { + tx = txMgr.masqueradeAs(commitMsg); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + tx.setCommitOnBehalfOfRemoteStub(true); + try { + tx.commit(); + } finally { + txMgr.unmasquerade(tx); + } txMgr.removeHostedTXState(txid); txMgr.saveTXCommitMessageForClientFailover(txid, txCommitMsg); } @@ -168,11 +185,12 @@ public class TXManagerImplTest { TXStateProxy curTx = txMgr.getHostedTXState(txid); assertNull(curTx); + assertFalse(tx.isInProgress()); //after TXStateProxy committed, getLock will get the lock for the oldtx //but caller should not perform ops on this TXStateProxy assertTrue(txMgr.getLock(tx, txid)); } - + @Test public void masqueradeAsCanGetLock() throws InterruptedException{ TXStateProxy tx; @@ -222,21 +240,43 @@ public class TXManagerImplTest { } @Test - public void hasTxAlreadyFinishedDetectsNoTx() { - assertFalse(txMgr.hasTxAlreadyFinished(null, txid)); + public void testTxStateWithNotFinishedTx() { + TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg); + assertTrue(tx.isInProgress()); } - + @Test - public void hasTxAlreadyFinishedDetectsTxNotFinished() { - TXStateProxy tx = txMgr.getOrSetHostedTXState(notCompletedTxid, msg); - assertFalse(txMgr.hasTxAlreadyFinished(tx, notCompletedTxid)); + public void testTxStateWithCommittedTx() throws InterruptedException { + when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class)); + setupTx(); + + TXStateProxy tx = txMgr.masqueradeAs(commitMsg); + try { + tx.commit(); + } finally { + txMgr.unmasquerade(tx); + } + assertFalse(tx.isInProgress()); } @Test - public void hasTxAlreadyFinishedDetectsTxFinished() throws InterruptedException { - TXStateProxy tx = txMgr.getOrSetHostedTXState(completedTxid, msg); - txMgr.saveTXCommitMessageForClientFailover(completedTxid, txCommitMsg); - assertTrue(txMgr.hasTxAlreadyFinished(tx, completedTxid)); + public void testTxStateWithRolledBackTx() throws InterruptedException { + when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class)); + setupTx(); + + TXStateProxy tx = txMgr.masqueradeAs(rollbackMsg); + try { + tx.rollback(); + } finally { + txMgr.unmasquerade(tx); + } + assertFalse(tx.isInProgress()); + } + + private void setupTx() throws InterruptedException { + TXStateProxy tx = txMgr.masqueradeAs(msg); + tx.setCommitOnBehalfOfRemoteStub(true); + txMgr.unmasquerade(tx); } @Test @@ -251,11 +291,8 @@ public class TXManagerImplTest { e.printStackTrace(); throw new RuntimeException(e); } - try { - msg.process(dm); - } finally { - txMgr.unmasquerade(tx1); - } + + msg.process(dm); TXStateProxy existingTx = masqueradeToRollback(); latch.countDown(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6f70cd70/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java index bbbf714..e05e130 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessageTest.java @@ -31,7 +31,6 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.internal.cache.DataLocationException; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; -import com.gemstone.gemfire.internal.cache.TXId; import com.gemstone.gemfire.internal.cache.TXManagerImpl; import com.gemstone.gemfire.internal.cache.TXStateProxy; import com.gemstone.gemfire.internal.cache.TXStateProxyImpl; @@ -47,7 +46,6 @@ public class PartitionMessageTest { private DistributionManager dm; private PartitionedRegion pr; private TXManagerImpl txMgr; - private TXId txid; private long startTime = 1; TXStateProxy tx; @@ -59,7 +57,6 @@ public class PartitionMessageTest { pr = mock(PartitionedRegion.class); txMgr = mock(TXManagerImpl.class); tx = mock(TXStateProxyImpl.class); - txid = new TXId(null, 0); when(msg.checkCacheClosing(dm)).thenReturn(false); when(msg.checkDSClosing(dm)).thenReturn(false); @@ -67,9 +64,8 @@ public class PartitionMessageTest { when(msg.getGemFireCacheImpl()).thenReturn(cache); when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime); when(msg.getTXManagerImpl(cache)).thenReturn(txMgr); - when(msg.hasTxAlreadyFinished(null, txMgr, txid)).thenCallRealMethod(); - doAnswer(new CallsRealMethods()).when(msg).process(dm); + doAnswer(new CallsRealMethods()).when(msg).process(dm); } @Test @@ -83,6 +79,7 @@ public class PartitionMessageTest { @Test public void messageForNotFinishedTXPerformsOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException { when(txMgr.masqueradeAs(msg)).thenReturn(tx); + when(tx.isInProgress()).thenReturn(true); msg.process(dm); verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime); @@ -91,10 +88,28 @@ public class PartitionMessageTest { @Test public void messageForFinishedTXDoesNotPerformOnRegion() throws InterruptedException, CacheException, QueryException, DataLocationException, IOException { when(txMgr.masqueradeAs(msg)).thenReturn(tx); - when(msg.hasTxAlreadyFinished(tx, txMgr, txid)).thenReturn(true); + when(tx.isInProgress()).thenReturn(false); msg.process(dm); verify(msg, times(0)).operateOnPartitionedRegion(dm, pr, startTime); } + @Test + public void noNewTxProcessingAfterTXManagerImplClosed() throws CacheException, QueryException, DataLocationException, InterruptedException, IOException { + txMgr = new TXManagerImpl(null, cache); + when(msg.getPartitionedRegion()).thenReturn(pr); + when(msg.getGemFireCacheImpl()).thenReturn(cache); + when(msg.getStartPartitionMessageProcessingTime(pr)).thenReturn(startTime); + when(msg.getTXManagerImpl(cache)).thenReturn(txMgr); + when(msg.canParticipateInTransaction()).thenReturn(true); + when(msg.canStartRemoteTransaction()).thenReturn(true); + + msg.process(dm); + + txMgr.close(); + + msg.process(dm); + + verify(msg, times(1)).operateOnPartitionedRegion(dm, pr, startTime); + } }