geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject [geode] branch develop updated: Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion are executed by the same thread. (#2388)
Date Tue, 28 Aug 2018 19:30:01 GMT
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 926f35e  Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion
are executed by the same thread. (#2388)
926f35e is described below

commit 926f35eb1d20c3b4bbe26472aea6ad74048dc8aa
Author: pivotal-eshu <eshu@pivotal.io>
AuthorDate: Tue Aug 28 12:29:56 2018 -0700

    Feature/geode 5624 Use a single thread to ensure beforeCompletion and afterCompletion
are executed by the same thread. (#2388)
---
 .../ClientServerJTAFailoverDistributedTest.java    |  37 +++++-
 .../geode/internal/cache/AfterCompletion.java      | 100 ++++++++++++++++
 .../geode/internal/cache/BeforeCompletion.java     |  68 +++++++++++
 .../internal/cache/SingleThreadJTAExecutor.java    |  71 +++++++++++
 .../org/apache/geode/internal/cache/TXState.java   |  56 +++++++--
 .../geode/internal/cache/AfterCompletionTest.java  | 131 +++++++++++++++++++++
 .../geode/internal/cache/BeforeCompletionTest.java |  98 +++++++++++++++
 .../cache/SingleThreadJTAExecutorTest.java         |  66 +++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  50 +++-----
 9 files changed, 632 insertions(+), 45 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
index cb6be25..5d60b6b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/jta/ClientServerJTAFailoverDistributedTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.client.ClientRegionFactory;
 import org.apache.geode.cache.client.PoolFactory;
@@ -65,12 +66,14 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
   private String hostName;
   private String uniqueName;
   private String regionName;
+  private String replicateRegionName;
   private VM server1;
   private VM server2;
   private VM server3;
   private VM client1;
   private int port1;
   private int port2;
+  private boolean hasReplicateRegion = false;
 
   @Rule
   public DistributedRule distributedRule = new DistributedRule();
@@ -94,6 +97,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     hostName = getHostName();
     uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
     regionName = uniqueName + "_region";
+    replicateRegionName = uniqueName + "_replicate_region";
   }
 
   @Test
@@ -130,6 +134,12 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
     regionFactory.create(regionName);
 
+    if (hasReplicateRegion) {
+      cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.REPLICATE)
+          .create(replicateRegionName);
+    }
+
+
     CacheServer server = cacheRule.getCache().addCacheServer();
     server.setPort(0);
     server.start();
@@ -152,6 +162,10 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     clientRegionFactory.setPoolName(pool.getName());
     clientRegionFactory.create(regionName);
 
+    if (hasReplicateRegion) {
+      clientRegionFactory.create(replicateRegionName);
+    }
+
     if (ports.length > 1) {
       pool.acquireConnection(new ServerLocation(hostName, port1));
     }
@@ -174,10 +188,13 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     Object[] results = new Object[2];
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = hasReplicateRegion ? cache.getRegion(replicateRegionName) :
null;
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.begin();
     region.put(key, newValue);
-
+    if (hasReplicateRegion) {
+      replicateRegion.put(key, newValue);
+    }
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
     ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null,
null);
     clientTXStateStub.beforeCompletion();
@@ -191,6 +208,7 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
   private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
     InternalClientCache cache = clientCacheRule.getClientCache();
     Region region = cache.getRegion(regionName);
+    Region replicateRegion = cache.getRegion(replicateRegionName);
     TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
     txManager.resume(transactionId);
 
@@ -205,6 +223,9 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     }
     if (isCommit) {
       assertEquals(newValue, region.get(key));
+      if (hasReplicateRegion) {
+        assertEquals(newValue, replicateRegion.get(key));
+      }
     } else {
       assertEquals(value, region.get(key));
     }
@@ -295,4 +316,18 @@ public class ClientServerJTAFailoverDistributedTest implements Serializable
{
     txStateStub.beforeCompletion();
   }
 
+  @Test
+  public void jtaCanFailoverToJTAHostForMixedRegionsAfterDoneBeforeCompletion() {
+    hasReplicateRegion = true;
+    port2 = server2.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    server1.invoke(() -> cacheRule.getCache().close());
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0],
true));
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
new file mode 100644
index 0000000..028e067
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AfterCompletion.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.function.BooleanSupplier;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+public class AfterCompletion {
+  private static final Logger logger = LogService.getLogger();
+
+  private boolean started;
+  private boolean finished;
+  private int status = -1;
+  private boolean cancelled;
+  private RuntimeException exception;
+
+  public synchronized void doOp(TXState txState, CancelCriterion cancelCriterion) {
+    // there should be a transaction timeout that keeps this thread
+    // from sitting around forever if the client goes away
+    // The above was done by setting afterCompletionCancelled in txState
+    // during cleanup. When client departed, the transaction/JTA
+    // will be timed out and cleanup code will be executed.
+    waitForExecuteOrCancel(cancelCriterion);
+    started = true;
+    logger.debug("executing afterCompletion notification");
+
+    try {
+      if (cancelled) {
+        txState.doCleanup();
+      } else {
+        txState.doAfterCompletion(status);
+      }
+    } catch (RuntimeException exception) {
+      this.exception = exception;
+    } finally {
+      logger.debug("afterCompletion notification completed");
+      finished = true;
+      notifyAll();
+    }
+  }
+
+  private void waitForExecuteOrCancel(CancelCriterion cancelCriterion) {
+    waitForCondition(cancelCriterion, () -> status != -1 || cancelled);
+  }
+
+  private synchronized void waitForCondition(CancelCriterion cancelCriterion,
+      BooleanSupplier condition) {
+    while (!condition.getAsBoolean()) {
+      cancelCriterion.checkCancelInProgress(null);
+      try {
+        logger.debug("waiting for notification");
+        wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+    }
+  }
+
+  public synchronized void execute(CancelCriterion cancelCriterion, int status) {
+    this.status = status;
+    signalAndWaitForDoOp(cancelCriterion);
+  }
+
+  private void signalAndWaitForDoOp(CancelCriterion cancelCriterion) {
+    notifyAll();
+    waitUntilFinished(cancelCriterion);
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  private void waitUntilFinished(CancelCriterion cancelCriterion) {
+    waitForCondition(cancelCriterion, () -> finished);
+  }
+
+  public synchronized void cancel(CancelCriterion cancelCriterion) {
+    cancelled = true;
+    signalAndWaitForDoOp(cancelCriterion);
+  }
+
+  public synchronized boolean isStarted() {
+    return started;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
new file mode 100644
index 0000000..247a0f2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BeforeCompletion.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.internal.logging.LogService;
+
+public class BeforeCompletion {
+  private static final Logger logger = LogService.getLogger();
+
+  private boolean started;
+  private boolean finished;
+  private SynchronizationCommitConflictException exception;
+
+  public synchronized void doOp(TXState txState) {
+    try {
+      txState.doBeforeCompletion();
+    } catch (SynchronizationCommitConflictException exception) {
+      this.exception = exception;
+    } finally {
+      logger.debug("beforeCompletion notification completed");
+      finished = true;
+      notifyAll();
+    }
+  }
+
+  public synchronized void execute(CancelCriterion cancelCriterion) {
+    started = true;
+    waitUntilFinished(cancelCriterion);
+    if (exception != null) {
+      throw exception;
+    }
+  }
+
+  private void waitUntilFinished(CancelCriterion cancelCriterion) {
+    while (!finished) {
+      cancelCriterion.checkCancelInProgress(null);
+      try {
+        wait(1000);
+      } catch (InterruptedException ignore) {
+        // eat the interrupt and check for exit conditions
+      }
+    }
+  }
+
+  public synchronized boolean isStarted() {
+    return started;
+  }
+
+  public synchronized boolean isFinished() {
+    return finished;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
new file mode 100644
index 0000000..636def9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SingleThreadJTAExecutor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.util.concurrent.Executor;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * This class ensures that beforeCompletion and afterCompletion are executed in the same
thread.
+ *
+ * @since Geode 1.7.0
+ */
+public class SingleThreadJTAExecutor {
+  private static final Logger logger = LogService.getLogger();
+
+  private final BeforeCompletion beforeCompletion;
+  private final AfterCompletion afterCompletion;
+
+  public SingleThreadJTAExecutor() {
+    this(new BeforeCompletion(), new AfterCompletion());
+  }
+
+  public SingleThreadJTAExecutor(BeforeCompletion beforeCompletion,
+      AfterCompletion afterCompletion) {
+    this.beforeCompletion = beforeCompletion;
+    this.afterCompletion = afterCompletion;
+  }
+
+  private void doOps(TXState txState, CancelCriterion cancelCriterion) {
+    beforeCompletion.doOp(txState);
+    afterCompletion.doOp(txState, cancelCriterion);
+  }
+
+  public void executeBeforeCompletion(TXState txState, Executor executor,
+      CancelCriterion cancelCriterion) {
+    executor.execute(() -> doOps(txState, cancelCriterion));
+
+    beforeCompletion.execute(cancelCriterion);
+  }
+
+  public void executeAfterCompletion(CancelCriterion cancelCriterion, int status) {
+    afterCompletion.execute(cancelCriterion, status);
+  }
+
+  /**
+   * stop waiting for an afterCompletion to arrive and just exit
+   */
+  public void cleanup(CancelCriterion cancelCriterion) {
+    afterCompletion.cancel(cancelCriterion);
+  }
+
+  public boolean shouldDoCleanup() {
+    return beforeCompletion.isFinished() && !afterCompletion.isStarted();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9494fd3..5263e2e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,12 +24,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
@@ -101,6 +103,13 @@ public class TXState implements TXStateInterface {
   // Access this variable should be in synchronized block.
   private boolean beforeCompletionCalled;
 
+  /**
+   * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+   * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion
step.
+   * This is that thread
+   */
+  private final SingleThreadJTAExecutor singleThreadJTAExecutor;
+
   // Internal testing hooks
   private Runnable internalAfterReservation;
   protected Runnable internalAfterConflictCheck;
@@ -142,6 +151,11 @@ public class TXState implements TXStateInterface {
   private volatile DistributedMember proxyServer;
 
   public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+    this(proxy, onBehalfOfRemoteStub, new SingleThreadJTAExecutor());
+  }
+
+  public TXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub,
+      SingleThreadJTAExecutor singleThreadJTAExecutor) {
     this.beginTime = CachePerfStats.getStatTime();
     this.regions = new IdentityHashMap<>();
 
@@ -154,7 +168,7 @@ public class TXState implements TXStateInterface {
     this.internalAfterSend = null;
     this.proxy = proxy;
     this.onBehalfOfRemoteStub = onBehalfOfRemoteStub;
-
+    this.singleThreadJTAExecutor = singleThreadJTAExecutor;
   }
 
   private boolean hasSeenEvent(EntryEventImpl event) {
@@ -417,7 +431,7 @@ public class TXState implements TXStateInterface {
       }
 
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity
to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity
to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -857,6 +871,14 @@ public class TXState implements TXStateInterface {
   }
 
   protected void cleanup() {
+    if (singleThreadJTAExecutor.shouldDoCleanup()) {
+      singleThreadJTAExecutor.cleanup(getCancelCriterion());
+    } else {
+      doCleanup();
+    }
+  }
+
+  void doCleanup() {
     IllegalArgumentException iae = null;
     try {
       this.closed = true;
@@ -910,6 +932,7 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
+
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1006,24 +1029,36 @@ public class TXState implements TXStateInterface {
    */
   @Override
   public synchronized void beforeCompletion() throws SynchronizationCommitConflictException
{
+    proxy.getTxMgr().setTXState(null);
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
+
     if (beforeCompletionCalled) {
       // do not re-execute beforeCompletion again
       return;
     }
     beforeCompletionCalled = true;
-    doBeforeCompletion();
+    singleThreadJTAExecutor.executeBeforeCompletion(this,
+        getExecutor(), getCancelCriterion());
   }
 
-  private void doBeforeCompletion() {
+  private Executor getExecutor() {
+    return getCache().getDistributionManager().getWaitingThreadPool();
+  }
+
+  private CancelCriterion getCancelCriterion() {
+    return getCache().getCancelCriterion();
+  }
+
+  void doBeforeCompletion() {
     final long opStart = CachePerfStats.getStatTime();
     this.jtaLifeTime = opStart - getBeginTime();
+
     try {
       reserveAndCheck();
       /*
-       * If there is a TransactionWriter plugged in, we need to to give it an opportunity
to abort
+       * If there is a TransactionWriter plugged in, we need to to give it an opportunity
to cleanup
        * the transaction.
        */
       TransactionWriter writer = this.proxy.getTxMgr().getWriter();
@@ -1072,11 +1107,12 @@ public class TXState implements TXStateInterface {
    */
   @Override
   public synchronized void afterCompletion(int status) {
-    this.proxy.getTxMgr().setTXState(null);
-    // For commit, beforeCompletion should be called. Otherwise
+    proxy.getTxMgr().setTXState(null);
+    // if there was a beforeCompletion call then there will be a thread
+    // sitting in the waiting pool to execute afterCompletion. Otherwise
     // throw FailedSynchronizationException().
-    if (wasBeforeCompletionCalled()) {
-      doAfterCompletion(status);
+    if (beforeCompletionCalled) {
+      singleThreadJTAExecutor.executeAfterCompletion(getCancelCriterion(), status);
     } else {
       // rollback does not run beforeCompletion.
       if (status != Status.STATUS_ROLLEDBACK) {
@@ -1087,7 +1123,7 @@ public class TXState implements TXStateInterface {
     }
   }
 
-  private void doAfterCompletion(int status) {
+  void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
     try {
       switch (status) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
new file mode 100644
index 0000000..d94df0e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AfterCompletionTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+
+public class AfterCompletionTest {
+  private AfterCompletion afterCompletion;
+  private CancelCriterion cancelCriterion;
+  private TXState txState;
+  private Thread doOpThread;
+
+  @Before
+  public void setup() {
+    afterCompletion = new AfterCompletion();
+    cancelCriterion = mock(CancelCriterion.class);
+    txState = mock(TXState.class);
+  }
+
+  @Test
+  public void executeThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void cancelThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void isStartedReturnsFalseIfNotExecuted() {
+    assertThat(afterCompletion.isStarted()).isFalse();
+  }
+
+  @Test
+  public void isStartedReturnsTrueIfExecuted() {
+    startDoOp();
+
+    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+
+    verifyDoOpFinished();
+    assertThat(afterCompletion.isStarted()).isTrue();
+  }
+
+  @Test
+  public void executeCallsDoAfterCompletion() {
+    startDoOp();
+
+    afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED);
+    verifyDoOpFinished();
+    verify(txState, times(1)).doAfterCompletion(eq(Status.STATUS_COMMITTED));
+  }
+
+  @Test
+  public void executeThrowsDoAfterCompletionThrows() {
+    startDoOp();
+    doThrow(new RuntimeException()).when(txState).doAfterCompletion(Status.STATUS_COMMITTED);
+
+    assertThatThrownBy(() -> afterCompletion.execute(cancelCriterion, Status.STATUS_COMMITTED))
+        .isInstanceOf(RuntimeException.class);
+
+    verifyDoOpFinished();
+  }
+
+  @Test
+  public void cancelCallsDoCleanup() {
+    startDoOp();
+
+    afterCompletion.cancel(cancelCriterion);
+    verifyDoOpFinished();
+    verify(txState, times(1)).doCleanup();
+  }
+
+  @Test
+  public void cancelThrowsDoCleanupThrows() {
+    startDoOp();
+    doThrow(new RuntimeException()).when(txState).doCleanup();
+
+    assertThatThrownBy(() -> afterCompletion.cancel(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+
+    verifyDoOpFinished();
+  }
+
+  private void startDoOp() {
+    doOpThread = new Thread(() -> afterCompletion.doOp(txState, cancelCriterion));
+    doOpThread.start();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+  }
+
+  private void verifyDoOpFinished() {
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !doOpThread.isAlive());
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
new file mode 100644
index 0000000..1f541b6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BeforeCompletionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+
+public class BeforeCompletionTest {
+
+  private BeforeCompletion beforeCompletion;
+  private CancelCriterion cancelCriterion;
+  private TXState txState;
+
+  @Before
+  public void setup() {
+    beforeCompletion = new BeforeCompletion();
+    cancelCriterion = mock(CancelCriterion.class);
+    txState = mock(TXState.class);
+  }
+
+  @Test
+  public void executeThrowsExceptionIfDoOpFailedWithException() {
+    doThrow(new SynchronizationCommitConflictException("")).when(txState).doBeforeCompletion();
+
+    beforeCompletion.doOp(txState);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(SynchronizationCommitConflictException.class);
+  }
+
+  @Test
+  public void doOpCallsDoBeforeCompletion() {
+    beforeCompletion.doOp(txState);
+
+    verify(txState, times(1)).doBeforeCompletion();
+  }
+
+  @Test
+  public void isStartedReturnsFalseIfNotExecuted() {
+    assertThat(beforeCompletion.isStarted()).isFalse();
+  }
+
+  @Test
+  public void isStartedReturnsTrueIfExecuted() {
+    beforeCompletion.doOp(txState);
+    beforeCompletion.execute(cancelCriterion);
+
+    assertThat(beforeCompletion.isStarted()).isTrue();
+  }
+
+  @Test
+  public void executeThrowsIfCancelCriterionThrows() {
+    doThrow(new RuntimeException()).when(cancelCriterion).checkCancelInProgress(null);
+
+    assertThatThrownBy(() -> beforeCompletion.execute(cancelCriterion))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  @Test
+  public void executeWaitsUntilDoOpFinish() throws Exception {
+    Thread thread = new Thread(() -> beforeCompletion.execute(cancelCriterion));
+    thread.start();
+    // give the thread a chance to get past the "finished" check by waiting until
+    // checkCancelInProgress is called
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .untilAsserted(() -> verify(cancelCriterion, times(1)).checkCancelInProgress(null));
+
+    beforeCompletion.doOp(txState);
+
+    Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> !(thread.isAlive()));
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
new file mode 100644
index 0000000..bc48eb4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SingleThreadJTAExecutorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import org.apache.geode.CancelCriterion;
+
+public class SingleThreadJTAExecutorTest {
+  private SingleThreadJTAExecutor singleThreadJTAExecutor;
+  private TXState txState;
+  private ExecutorService executor;
+  private BeforeCompletion beforeCompletion;
+  private AfterCompletion afterCompletion;
+  private CancelCriterion cancelCriterion;
+
+  @Before
+  public void setup() {
+    txState = mock(TXState.class, RETURNS_DEEP_STUBS);
+    executor = Executors.newSingleThreadExecutor();
+    beforeCompletion = mock(BeforeCompletion.class);
+    afterCompletion = mock(AfterCompletion.class);
+    cancelCriterion = mock(CancelCriterion.class);
+    singleThreadJTAExecutor = new SingleThreadJTAExecutor(beforeCompletion, afterCompletion);
+  }
+
+  @Test
+  public void executeBeforeCompletionCallsDoOps() {
+    InOrder inOrder = inOrder(beforeCompletion, afterCompletion);
+
+    singleThreadJTAExecutor.executeBeforeCompletion(txState, executor, cancelCriterion);
+
+    verify(beforeCompletion, times(1)).execute(eq(cancelCriterion));
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .untilAsserted(() -> inOrder.verify(beforeCompletion, times(1)).doOp(eq(txState)));
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
+        () -> inOrder.verify(afterCompletion, times(1)).doOp(eq(txState), eq(cancelCriterion)));
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 5ec4cbd..21fadf0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -32,10 +33,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
-import org.apache.geode.cache.TransactionException;
-import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 
 public class TXStateTest {
   private TXStateProxyImpl txStateProxy;
@@ -44,60 +44,51 @@ public class TXStateTest {
 
   @Before
   public void setup() {
-    txStateProxy = mock(TXStateProxyImpl.class);
+    txStateProxy = mock(TXStateProxyImpl.class, RETURNS_DEEP_STUBS);
     exception = new CommitConflictException("");
     transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
 
     when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
   }
 
-
   @Test
-  public void beforeCompletionThrowsIfReserveAndCheckFails() {
+  public void doBeforeCompletionThrowsIfReserveAndCheckFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
     doThrow(exception).when(txState).reserveAndCheck();
 
-    assertThatThrownBy(() -> txState.beforeCompletion())
+    assertThatThrownBy(() -> txState.doBeforeCompletion())
         .isInstanceOf(SynchronizationCommitConflictException.class);
   }
 
-
   @Test
-  public void afterCompletionThrowsIfCommitFails() {
+  public void doAfterCompletionThrowsIfCommitFails() {
     TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(mock(InternalCache.class)).when(txState).getCache();
     doReturn(true).when(txState).wasBeforeCompletionCalled();
     txState.reserveAndCheck();
     doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
 
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+    assertThatThrownBy(() -> txState.doAfterCompletion(Status.STATUS_COMMITTED))
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
   @Test
-  public void afterCompletionThrowsTransactionExceptionIfCommitFailedCommitConflictException()
{
-    TXState txState = spy(new TXState(txStateProxy, true));
-    doReturn(mock(InternalCache.class)).when(txState).getCache();
-    doReturn(true).when(txState).wasBeforeCompletionCalled();
-    doThrow(exception).when(txState).commit();
-
-    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
-        .isInstanceOf(TransactionException.class);
-  }
-
-  @Test
-  public void afterCompletionCanCommitJTA() {
+  public void doAfterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false));
-    doReturn(mock(InternalCache.class)).when(txState).getCache();
     txState.reserveAndCheck();
     txState.closed = true;
     doReturn(true).when(txState).wasBeforeCompletionCalled();
-    txState.afterCompletion(Status.STATUS_COMMITTED);
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
 
     assertThat(txState.locks).isNull();
     verify(txState, times(1)).saveTXCommitMessageForClientFailover();
   }
 
+  @Test(expected = FailedSynchronizationException.class)
+  public void afterCompletionThrowsExceptionIfBeforeCompletionNotCalled() {
+    TXState txState = new TXState(txStateProxy, true);
+    txState.afterCompletion(Status.STATUS_COMMITTED);
+  }
+
   @Test
   public void afterCompletionCanRollbackJTA() {
     TXState txState = spy(new TXState(txStateProxy, true));
@@ -153,16 +144,7 @@ public class TXStateTest {
   public void getOriginatingMemberReturnsNullIfNotOriginatedFromClient() {
     TXState txState = spy(new TXState(txStateProxy, false));
 
-    assertThat(txState.getOriginatingMember()).isNull();
+    assertThat(txState.getOriginatingMember()).isSameAs(txStateProxy.getOnBehalfOfClientMember());
   }
 
-  @Test
-  public void getOriginatingMemberReturnsClientMemberIfOriginatedFromClient() {
-    InternalDistributedMember client = mock(InternalDistributedMember.class);
-    TXStateProxyImpl proxy = new TXStateProxyImpl(mock(InternalCache.class),
-        mock(TXManagerImpl.class), mock(TXId.class), client);
-    TXState txState = spy(new TXState(proxy, false));
-
-    assertThat(txState.getOriginatingMember()).isEqualTo(client);
-  }
 }


Mime
View raw message