From commits-return-28116-archive-asf-public=cust-asf.ponee.io@geode.apache.org Tue Aug 28 21:30:04 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B0852180674 for ; Tue, 28 Aug 2018 21:30:03 +0200 (CEST) Received: (qmail 70577 invoked by uid 500); 28 Aug 2018 19:30:02 -0000 Mailing-List: contact commits-help@geode.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.apache.org Delivered-To: mailing list commits@geode.apache.org Received: (qmail 70564 invoked by uid 99); 28 Aug 2018 19:30:02 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Aug 2018 19:30:02 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 39168807E4; Tue, 28 Aug 2018 19:30:02 +0000 (UTC) Date: Tue, 28 Aug 2018 19:30:01 +0000 To: "commits@geode.apache.org" Subject: [geode] branch develop updated: Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153548460080.11271.7519785848708001862@gitbox.apache.org> From: eshu11@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: geode X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Oldrev: fd29e625c482c9b983d69a4b1ad2abee3775a69b X-Git-Newrev: 926f35eb1d20c3b4bbe26472aea6ad74048dc8aa X-Git-Rev: 926f35eb1d20c3b4bbe26472aea6ad74048dc8aa X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git The following commit(s) were added to refs/heads/develop by this push: new 926f35e Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388) 926f35e is described below commit 926f35eb1d20c3b4bbe26472aea6ad74048dc8aa Author: pivotal-eshu AuthorDate: Tue Aug 28 12:29:56 2018 -0700 Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388) --- .../ClientServerJTAFailoverDistributedTest.java | 37 +++++- .../geode/internal/cache/AfterCompletion.java | 100 ++++++++++++++++ .../geode/internal/cache/BeforeCompletion.java | 68 +++++++++++ .../internal/cache/SingleThreadJTAExecutor.java | 71 +++++++++++ .../org/apache/geode/internal/cache/TXState.java | 56 +++++++-- .../geode/internal/cache/AfterCompletionTest.java | 131 +++++++++++++++++++++ .../geode/internal/cache/BeforeCompletionTest.java | 98 +++++++++++++++ .../cache/SingleThreadJTAExecutorTest.java | 66 +++++++++++ .../apache/geode/internal/cache/TXStateTest.java | 50 +++----- 9 files changed, 632 insertions(+), 45 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java index cb6be25..5d60b6b 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.apache.geode.cache.PartitionAttributesFactory; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; import org.apache.geode.cache.TransactionId; import org.apache.geode.cache.client.ClientRegionFactory; import org.apache.geode.cache.client.PoolFactory; @@ -65,12 +66,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { private String hostName; private String uniqueName; private String regionName; + private String replicateRegionName; private VM server1; private VM server2; private VM server3; private VM client1; private int port1; private int port2; + private boolean hasReplicateRegion = false; @Rule public DistributedRule distributedRule = new DistributedRule(); @@ -94,6 +97,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { hostName = getHostName(); uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName(); regionName = uniqueName + "_region"; + replicateRegionName = uniqueName + "_replicate_region"; } @Test @@ -130,6 +134,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { regionFactory.setPartitionAttributes(partitionAttributesFactory.create()); regionFactory.create(regionName); + if (hasReplicateRegion) { + cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE) + .create(replicateRegionName); + } + + CacheServer server = cacheRule.getCache().addCacheServer(); server.setPort(0); server.start(); @@ -152,6 +162,10 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { clientRegionFactory.setPoolName(pool.getName()); clientRegionFactory.create(regionName); + if (hasReplicateRegion) { + clientRegionFactory.create(replicateRegionName); + } + if (ports.length > 1) { pool.acquireConnection(new ServerLocation(hostName, port1)); } @@ -174,10 +188,13 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { Object[] results = new Object[2]; InternalClientCache cache = clientCacheRule.getClientCache(); Region region = cache.getRegion(regionName); + Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) : null; TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); txManager.begin(); region.put(key, newValue); - + if (hasReplicateRegion) { + replicateRegion.put(key, newValue); + } TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState(); ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null); clientTXStateStub.beforeCompletion(); @@ -191,6 +208,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { private void doAfterCompletion(TransactionId transactionId, boolean isCommit) { InternalClientCache cache = clientCacheRule.getClientCache(); Region region = cache.getRegion(regionName); + Region replicateRegion = cache.getRegion(replicateRegionName); TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); txManager.resume(transactionId); @@ -205,6 +223,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { } if (isCommit) { assertEquals(newValue, region.get(key)); + if (hasReplicateRegion) { + assertEquals(newValue, replicateRegion.get(key)); + } } else { assertEquals(value, region.get(key)); } @@ -295,4 +316,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable { txStateStub.beforeCompletion(); } + @Test + public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() { + hasReplicateRegion = true; + port2 = server2.invoke(() -> createServerRegion(1, false)); + server2.invoke(() -> doPut(key, value)); + port1 = server1.invoke(() -> createServerRegion(1, true)); + + client1.invoke(() -> createClientRegion(port1, port2)); + Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion()); + + server1.invoke(() -> cacheRule.getCache().close()); + + client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true)); + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java new file mode 100644 index 0000000..028e067 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.function.BooleanSupplier; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.internal.logging.LogService; + +public class AfterCompletion { + private static final Logger logger = LogService.getLogger(); + + private boolean started; + private boolean finished; + private int status = -1; + private boolean cancelled; + private RuntimeException exception; + + public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) { + // there should be a transaction timeout that keeps this thread + // from sitting around forever if the client goes away + // The above was done by setting afterCompletionCancelled in txState + // during cleanup. When client departed, the transaction/JTA + // will be timed out and cleanup code will be executed. + waitForExecuteOrCancel(cancelCriterion); + started = true; + logger.debug("executing afterCompletion notification"); + + try { + if (cancelled) { + txState.doCleanup(); + } else { + txState.doAfterCompletion(status); + } + } catch (RuntimeException exception) { + this.exception = exception; + } finally { + logger.debug("afterCompletion notification completed"); + finished = true; + notifyAll(); + } + } + + private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) { + waitForCondition(cancelCriterion, () -> status != -1 || cancelled); + } + + private synchronized void waitForCondition(CancelCriterion cancelCriterion, + BooleanSupplier condition) { + while (!condition.getAsBoolean()) { + cancelCriterion.checkCancelInProgress(null); + try { + logger.debug("waiting for notification"); + wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + } + + public synchronized void execute(CancelCriterion cancelCriterion, int status) { + this.status = status; + signalAndWaitForDoOp(cancelCriterion); + } + + private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) { + notifyAll(); + waitUntilFinished(cancelCriterion); + if (exception != null) { + throw exception; + } + } + + private void waitUntilFinished(CancelCriterion cancelCriterion) { + waitForCondition(cancelCriterion, () -> finished); + } + + public synchronized void cancel(CancelCriterion cancelCriterion) { + cancelled = true; + signalAndWaitForDoOp(cancelCriterion); + } + + public synchronized boolean isStarted() { + return started; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java new file mode 100644 index 0000000..247a0f2 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.SynchronizationCommitConflictException; +import org.apache.geode.internal.logging.LogService; + +public class BeforeCompletion { + private static final Logger logger = LogService.getLogger(); + + private boolean started; + private boolean finished; + private SynchronizationCommitConflictException exception; + + public synchronized void doOp(TXState txState) { + try { + txState.doBeforeCompletion(); + } catch (SynchronizationCommitConflictException exception) { + this.exception = exception; + } finally { + logger.debug("beforeCompletion notification completed"); + finished = true; + notifyAll(); + } + } + + public synchronized void execute(CancelCriterion cancelCriterion) { + started = true; + waitUntilFinished(cancelCriterion); + if (exception != null) { + throw exception; + } + } + + private void waitUntilFinished(CancelCriterion cancelCriterion) { + while (!finished) { + cancelCriterion.checkCancelInProgress(null); + try { + wait(1000); + } catch (InterruptedException ignore) { + // eat the interrupt and check for exit conditions + } + } + } + + public synchronized boolean isStarted() { + return started; + } + + public synchronized boolean isFinished() { + return finished; + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java new file mode 100644 index 0000000..636def9 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import java.util.concurrent.Executor; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.internal.logging.LogService; + +/** + * This class ensures that beforeCompletion and afterCompletion are executed in the same thread. + * + * @since Geode 1.7.0 + */ +public class SingleThreadJTAExecutor { + private static final Logger logger = LogService.getLogger(); + + private final BeforeCompletion beforeCompletion; + private final AfterCompletion afterCompletion; + + public SingleThreadJTAExecutor() { + this(new BeforeCompletion(), new AfterCompletion()); + } + + public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion, + AfterCompletion afterCompletion) { + this.beforeCompletion = beforeCompletion; + this.afterCompletion = afterCompletion; + } + + private void doOps(TXState txState, CancelCriterion cancelCriterion) { + beforeCompletion.doOp(txState); + afterCompletion.doOp(txState, cancelCriterion); + } + + public void executeBeforeCompletion(TXState txState, Executor executor, + CancelCriterion cancelCriterion) { + executor.execute(() -> doOps(txState, cancelCriterion)); + + beforeCompletion.execute(cancelCriterion); + } + + public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) { + afterCompletion.execute(cancelCriterion, status); + } + + /** + * stop waiting for an afterCompletion to arrive and just exit + */ + public void cleanup(CancelCriterion cancelCriterion) { + afterCompletion.cancel(cancelCriterion); + } + + public boolean shouldDoCleanup() { + return beforeCompletion.isFinished() && !afterCompletion.isStarted(); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java index 9494fd3..5263e2e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java @@ -24,12 +24,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.locks.ReentrantLock; import javax.transaction.Status; import org.apache.logging.log4j.Logger; +import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.InternalGemFireError; import org.apache.geode.SystemFailure; @@ -101,6 +103,13 @@ public class TXState implements TXStateInterface { // Access this variable should be in synchronized block. private boolean beforeCompletionCalled; + /** + * for client/server JTA transactions we need to have a single thread handle both beforeCompletion + * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step. + * This is that thread + */ + private final SingleThreadJTAExecutor singleThreadJTAExecutor; + // Internal testing hooks private Runnable internalAfterReservation; protected Runnable internalAfterConflictCheck; @@ -142,6 +151,11 @@ public class TXState implements TXStateInterface { private volatile DistributedMember proxyServer; public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) { + this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor()); + } + + public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub, + SingleThreadJTAExecutor singleThreadJTAExecutor) { this.beginTime = CachePerfStats.getStatTime(); this.regions = new IdentityHashMap<>(); @@ -154,7 +168,7 @@ public class TXState implements TXStateInterface { this.internalAfterSend = null; this.proxy = proxy; this.onBehalfOfRemoteStub = onBehalfOfRemoteStub; - + this.singleThreadJTAExecutor = singleThreadJTAExecutor; } private boolean hasSeenEvent(EntryEventImpl event) { @@ -417,7 +431,7 @@ public class TXState implements TXStateInterface { } /* - * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort + * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup * the transaction. */ TransactionWriter writer = this.proxy.getTxMgr().getWriter(); @@ -857,6 +871,14 @@ public class TXState implements TXStateInterface { } protected void cleanup() { + if (singleThreadJTAExecutor.shouldDoCleanup()) { + singleThreadJTAExecutor.cleanup(getCancelCriterion()); + } else { + doCleanup(); + } + } + + void doCleanup() { IllegalArgumentException iae = null; try { this.closed = true; @@ -910,6 +932,7 @@ public class TXState implements TXStateInterface { synchronized (this.completionGuard) { this.completionGuard.notifyAll(); } + if (iae != null && !this.proxy.getCache().isClosed()) { throw iae; } @@ -1006,24 +1029,36 @@ public class TXState implements TXStateInterface { */ @Override public synchronized void beforeCompletion() throws SynchronizationCommitConflictException { + proxy.getTxMgr().setTXState(null); if (this.closed) { throw new TXManagerCancelledException(); } + if (beforeCompletionCalled) { // do not re-execute beforeCompletion again return; } beforeCompletionCalled = true; - doBeforeCompletion(); + singleThreadJTAExecutor.executeBeforeCompletion(this, + getExecutor(), getCancelCriterion()); } - private void doBeforeCompletion() { + private Executor getExecutor() { + return getCache().getDistributionManager().getWaitingThreadPool(); + } + + private CancelCriterion getCancelCriterion() { + return getCache().getCancelCriterion(); + } + + void doBeforeCompletion() { final long opStart = CachePerfStats.getStatTime(); this.jtaLifeTime = opStart - getBeginTime(); + try { reserveAndCheck(); /* - * If there is a TransactionWriter plugged in, we need to to give it an opportunity to abort + * If there is a TransactionWriter plugged in, we need to to give it an opportunity to cleanup * the transaction. */ TransactionWriter writer = this.proxy.getTxMgr().getWriter(); @@ -1072,11 +1107,12 @@ public class TXState implements TXStateInterface { */ @Override public synchronized void afterCompletion(int status) { - this.proxy.getTxMgr().setTXState(null); - // For commit, beforeCompletion should be called. Otherwise + proxy.getTxMgr().setTXState(null); + // if there was a beforeCompletion call then there will be a thread + // sitting in the waiting pool to execute afterCompletion. Otherwise // throw FailedSynchronizationException(). - if (wasBeforeCompletionCalled()) { - doAfterCompletion(status); + if (beforeCompletionCalled) { + singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status); } else { // rollback does not run beforeCompletion. if (status != Status.STATUS_ROLLEDBACK) { @@ -1087,7 +1123,7 @@ public class TXState implements TXStateInterface { } } - private void doAfterCompletion(int status) { + void doAfterCompletion(int status) { final long opStart = CachePerfStats.getStatTime(); try { switch (status) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java new file mode 100644 index 0000000..d94df0e --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.TimeUnit; + +import javax.transaction.Status; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; + +public class AfterCompletionTest { + private AfterCompletion afterCompletion; + private CancelCriterion cancelCriterion; + private TXState txState; + private Thread doOpThread; + + @Before + public void setup() { + afterCompletion = new AfterCompletion(); + cancelCriterion = mock(CancelCriterion.class); + txState = mock(TXState.class); + } + + @Test + public void executeThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void cancelThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void isStartedReturnsFalseIfNotExecuted() { + assertThat(afterCompletion.isStarted()).isFalse(); + } + + @Test + public void isStartedReturnsTrueIfExecuted() { + startDoOp(); + + afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED); + + verifyDoOpFinished(); + assertThat(afterCompletion.isStarted()).isTrue(); + } + + @Test + public void executeCallsDoAfterCompletion() { + startDoOp(); + + afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED); + verifyDoOpFinished(); + verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED)); + } + + @Test + public void executeThrowsDoAfterCompletionThrows() { + startDoOp(); + doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED); + + assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED)) + .isInstanceOf(RuntimeException.class); + + verifyDoOpFinished(); + } + + @Test + public void cancelCallsDoCleanup() { + startDoOp(); + + afterCompletion.cancel(cancelCriterion); + verifyDoOpFinished(); + verify(txState, times(1)).doCleanup(); + } + + @Test + public void cancelThrowsDoCleanupThrows() { + startDoOp(); + doThrow(new RuntimeException()).when(txState).doCleanup(); + + assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + + verifyDoOpFinished(); + } + + private void startDoOp() { + doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion)); + doOpThread.start(); + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null)); + + } + + private void verifyDoOpFinished() { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive()); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java new file mode 100644 index 0000000..1f541b6 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.CancelCriterion; +import org.apache.geode.cache.SynchronizationCommitConflictException; + +public class BeforeCompletionTest { + + private BeforeCompletion beforeCompletion; + private CancelCriterion cancelCriterion; + private TXState txState; + + @Before + public void setup() { + beforeCompletion = new BeforeCompletion(); + cancelCriterion = mock(CancelCriterion.class); + txState = mock(TXState.class); + } + + @Test + public void executeThrowsExceptionIfDoOpFailedWithException() { + doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion(); + + beforeCompletion.doOp(txState); + + assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion)) + .isInstanceOf(SynchronizationCommitConflictException.class); + } + + @Test + public void doOpCallsDoBeforeCompletion() { + beforeCompletion.doOp(txState); + + verify(txState, times(1)).doBeforeCompletion(); + } + + @Test + public void isStartedReturnsFalseIfNotExecuted() { + assertThat(beforeCompletion.isStarted()).isFalse(); + } + + @Test + public void isStartedReturnsTrueIfExecuted() { + beforeCompletion.doOp(txState); + beforeCompletion.execute(cancelCriterion); + + assertThat(beforeCompletion.isStarted()).isTrue(); + } + + @Test + public void executeThrowsIfCancelCriterionThrows() { + doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null); + + assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion)) + .isInstanceOf(RuntimeException.class); + } + + @Test + public void executeWaitsUntilDoOpFinish() throws Exception { + Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion)); + thread.start(); + // give the thread a chance to get past the "finished" check by waiting until + // checkCancelInProgress is called + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null)); + + beforeCompletion.doOp(txState); + + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive())); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java new file mode 100644 index 0000000..bc48eb4 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import org.apache.geode.CancelCriterion; + +public class SingleThreadJTAExecutorTest { + private SingleThreadJTAExecutor singleThreadJTAExecutor; + private TXState txState; + private ExecutorService executor; + private BeforeCompletion beforeCompletion; + private AfterCompletion afterCompletion; + private CancelCriterion cancelCriterion; + + @Before + public void setup() { + txState = mock(TXState.class, RETURNS_DEEP_STUBS); + executor = Executors.newSingleThreadExecutor(); + beforeCompletion = mock(BeforeCompletion.class); + afterCompletion = mock(AfterCompletion.class); + cancelCriterion = mock(CancelCriterion.class); + singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion); + } + + @Test + public void executeBeforeCompletionCallsDoOps() { + InOrder inOrder = inOrder(beforeCompletion, afterCompletion); + + singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion); + + verify(beforeCompletion, times(1)).execute(eq(cancelCriterion)); + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState))); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( + () -> inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion))); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java index 5ec4cbd..21fadf0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -32,10 +33,9 @@ import org.junit.Before; import org.junit.Test; import org.apache.geode.cache.CommitConflictException; +import org.apache.geode.cache.FailedSynchronizationException; import org.apache.geode.cache.SynchronizationCommitConflictException; import org.apache.geode.cache.TransactionDataNodeHasDepartedException; -import org.apache.geode.cache.TransactionException; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; public class TXStateTest { private TXStateProxyImpl txStateProxy; @@ -44,60 +44,51 @@ public class TXStateTest { @Before public void setup() { - txStateProxy = mock(TXStateProxyImpl.class); + txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS); exception = new CommitConflictException(""); transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException(""); when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class)); } - @Test - public void beforeCompletionThrowsIfReserveAndCheckFails() { + public void doBeforeCompletionThrowsIfReserveAndCheckFails() { TXState txState = spy(new TXState(txStateProxy, true)); doThrow(exception).when(txState).reserveAndCheck(); - assertThatThrownBy(() -> txState.beforeCompletion()) + assertThatThrownBy(() -> txState.doBeforeCompletion()) .isInstanceOf(SynchronizationCommitConflictException.class); } - @Test - public void afterCompletionThrowsIfCommitFails() { + public void doAfterCompletionThrowsIfCommitFails() { TXState txState = spy(new TXState(txStateProxy, true)); - doReturn(mock(InternalCache.class)).when(txState).getCache(); doReturn(true).when(txState).wasBeforeCompletionCalled(); txState.reserveAndCheck(); doThrow(transactionDataNodeHasDepartedException).when(txState).commit(); - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) + assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED)) .isSameAs(transactionDataNodeHasDepartedException); } @Test - public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException() { - TXState txState = spy(new TXState(txStateProxy, true)); - doReturn(mock(InternalCache.class)).when(txState).getCache(); - doReturn(true).when(txState).wasBeforeCompletionCalled(); - doThrow(exception).when(txState).commit(); - - assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED)) - .isInstanceOf(TransactionException.class); - } - - @Test - public void afterCompletionCanCommitJTA() { + public void doAfterCompletionCanCommitJTA() { TXState txState = spy(new TXState(txStateProxy, false)); - doReturn(mock(InternalCache.class)).when(txState).getCache(); txState.reserveAndCheck(); txState.closed = true; doReturn(true).when(txState).wasBeforeCompletionCalled(); - txState.afterCompletion(Status.STATUS_COMMITTED); + txState.doAfterCompletion(Status.STATUS_COMMITTED); assertThat(txState.locks).isNull(); verify(txState, times(1)).saveTXCommitMessageForClientFailover(); } + @Test(expected = FailedSynchronizationException.class) + public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() { + TXState txState = new TXState(txStateProxy, true); + txState.afterCompletion(Status.STATUS_COMMITTED); + } + @Test public void afterCompletionCanRollbackJTA() { TXState txState = spy(new TXState(txStateProxy, true)); @@ -153,16 +144,7 @@ public class TXStateTest { public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() { TXState txState = spy(new TXState(txStateProxy, false)); - assertThat(txState.getOriginatingMember()).isNull(); + assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember()); } - @Test - public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() { - InternalDistributedMember client = mock(InternalDistributedMember.class); - TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class), - mock(TXManagerImpl.class), mock(TXId.class), client); - TXState txState = spy(new TXState(proxy, false)); - - assertThat(txState.getOriginatingMember()).isEqualTo(client); - } }