geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject [geode] 01/05: GEODE-5376: Move SynchronizationRunnable to TXState
Date Tue, 17 Jul 2018 23:17:22 GMT
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5376
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 69d51c91a13ebc89d31ded924690c2a5e3c369e7
Author: eshu <eshu@pivotal.io>
AuthorDate: Tue Jul 10 12:01:51 2018 -0700

    GEODE-5376: Move SynchronizationRunnable to TXState
    
      Different threads can handle JTA beforeCompletion and afterCompletion after client failover.
      Use TXStateSynchronizationRunnable to handle this case.
---
 .../cache/DistTXStateProxyImplOnCoordinator.java   |   6 -
 .../internal/cache/PausedTXStateProxyImpl.java     |   8 -
 .../apache/geode/internal/cache/TXManagerImpl.java |   4 +
 .../org/apache/geode/internal/cache/TXState.java   | 159 +++++++++---
 .../apache/geode/internal/cache/TXStateProxy.java  |   9 -
 .../geode/internal/cache/TXStateProxyImpl.java     |  25 --
 ...le.java => TXStateSynchronizationRunnable.java} |  22 +-
 .../sockets/command/TXSynchronizationCommand.java  | 235 +++++++-----------
 .../geode/internal/cache/tx/ClientTXStateStub.java |   4 +
 ...ava => TXStateSynchronizationRunnableTest.java} |  54 ++--
 .../apache/geode/internal/cache/TXStateTest.java   | 152 ++++++++++++
 .../command/TXSynchronizationCommandTest.java      | 119 +++++++++
 .../ClientServerJTAFailoverDistributedTest.java    | 273 +++++++++++++++++++++
 13 files changed, 808 insertions(+), 262 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index c088c0f..031de9e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -117,9 +117,6 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
       }
 
       inProgress = preserveTx;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -281,9 +278,6 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
 
     } finally {
       inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
 
     /*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
index a796d5c..2e52551 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PausedTXStateProxyImpl.java
@@ -390,14 +390,6 @@ public class PausedTXStateProxyImpl implements TXStateProxy {
   public void setJCATransaction() {}
 
   @Override
-  public void setSynchronizationRunnable(TXSynchronizationRunnable sync) {}
-
-  @Override
-  public TXSynchronizationRunnable getSynchronizationRunnable() {
-    return null;
-  }
-
-  @Override
   public void suspend() {}
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index c5c7653..639656f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1875,4 +1875,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     return hostedTXStates;
   }
 
+  public boolean isHostedTXStatesEmpty() {
+    return hostedTXStates.isEmpty();
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 9768fb8..6b2ca35 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -24,23 +24,28 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
 
+import edu.umd.cs.findbugs.annotations.SuppressWarnings;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
+import org.apache.geode.InternalGemFireError;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.FailedSynchronizationException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.SynchronizationCommitConflictException;
 import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.TransactionId;
 import org.apache.geode.cache.TransactionWriter;
 import org.apache.geode.cache.TransactionWriterException;
@@ -48,6 +53,7 @@ import org.apache.geode.cache.UnsupportedOperationInTransactionException;
 import org.apache.geode.cache.client.internal.ServerRegionDataAccess;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.TXManagerCancelledException;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.control.MemoryThresholds;
@@ -95,6 +101,15 @@ public class TXState implements TXStateInterface {
    */
   private int modSerialNum;
   private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>();
+  /**
+   * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
+   * and afterCompletion so that beforeCompletion can obtain locks for the afterCompletion step.
+   * This is that thread
+   */
+  protected volatile TXStateSynchronizationRunnable syncRunnable;
+
+  private volatile SynchronizationCommitConflictException beforeCompletionException;
+  private volatile RuntimeException afterCompletionException;
 
   // Internal testing hooks
   private Runnable internalAfterReservation;
@@ -901,6 +916,9 @@ public class TXState implements TXStateInterface {
       synchronized (this.completionGuard) {
         this.completionGuard.notifyAll();
       }
+      if (this.syncRunnable != null) {
+        this.syncRunnable.abort();
+      }
       if (iae != null && !this.proxy.getCache().isClosed()) {
         throw iae;
       }
@@ -1000,11 +1018,45 @@ public class TXState implements TXStateInterface {
     if (this.closed) {
       throw new TXManagerCancelledException();
     }
-    this.proxy.getTxMgr().setTXState(null);
-    final long opStart = CachePerfStats.getStatTime();
-    this.jtaLifeTime = opStart - getBeginTime();
+    TXStateSynchronizationRunnable sync = createTxStateSynchronizationRunnable();
+    setSynchronizationRunnable(sync);
+
+    Executor exec = getExecutor();
+    exec.execute(sync);
+    sync.waitForFirstExecution();
+    if (getBeforeCompletionException() != null) {
+      throw getBeforeCompletionException();
+    }
+  }
 
+  TXStateSynchronizationRunnable createTxStateSynchronizationRunnable() {
+    Runnable beforeCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doBeforeCompletion();
+      }
+    };
 
+    return new TXStateSynchronizationRunnable(getCache().getCancelCriterion(),
+        beforeCompletion);
+  }
+
+  Executor getExecutor() {
+    return InternalDistributedSystem.getConnectedInstance().getDistributionManager()
+        .getWaitingThreadPool();
+  }
+
+  SynchronizationCommitConflictException getBeforeCompletionException() {
+    return beforeCompletionException;
+  }
+
+  private void setSynchronizationRunnable(TXStateSynchronizationRunnable synchronizationRunnable) {
+    syncRunnable = synchronizationRunnable;
+  }
+
+  void doBeforeCompletion() {
+    final long opStart = CachePerfStats.getStatTime();
+    this.jtaLifeTime = opStart - getBeginTime();
     try {
       reserveAndCheck();
       /*
@@ -1042,8 +1094,8 @@ public class TXState implements TXStateInterface {
       }
     } catch (CommitConflictException commitConflict) {
       cleanup();
-      this.proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
-      throw new SynchronizationCommitConflictException(
+      proxy.getTxMgr().noteCommitFailure(opStart, this.jtaLifeTime, this);
+      beforeCompletionException = new SynchronizationCommitConflictException(
           LocalizedStrings.TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0
               .toLocalizedString(getTransactionId()),
           commitConflict);
@@ -1057,40 +1109,79 @@ public class TXState implements TXStateInterface {
    */
   @Override
   public void afterCompletion(int status) {
-    // System.err.println("start afterCompletion");
+    this.proxy.getTxMgr().setTXState(null);
+
+    Runnable afterCompletion = new Runnable() {
+      @SuppressWarnings("synthetic-access")
+      public void run() {
+        doAfterCompletion(status);
+      }
+    };
+    // if there was a beforeCompletion call then there will be a thread
+    // sitting in the waiting pool to execute afterCompletion. Otherwise
+    // throw FailedSynchronizationException().
+    TXStateSynchronizationRunnable sync = getSynchronizationRunnable();
+    if (sync != null) {
+      sync.runSecondRunnable(afterCompletion);
+      if (getAfterCompletionException() != null) {
+        throw getAfterCompletionException();
+      }
+    } else {
+      // rollback does not run beforeCompletion.
+      if (status != Status.STATUS_ROLLEDBACK) {
+        throw new FailedSynchronizationException(
+            "Could not execute afterCompletion when beforeCompletion was not executed");
+      }
+      doAfterCompletion(status);
+    }
+  }
+
+  TXStateSynchronizationRunnable getSynchronizationRunnable() {
+    return this.syncRunnable;
+  }
+
+  RuntimeException getAfterCompletionException() {
+    return afterCompletionException;
+  }
+
+  void doAfterCompletion(int status) {
     final long opStart = CachePerfStats.getStatTime();
-    switch (status) {
-      case Status.STATUS_COMMITTED:
-        // System.err.println("begin commit in afterCompletion");
-        Assert.assertTrue(this.locks != null,
-            "Gemfire Transaction afterCompletion called with illegal state.");
-        try {
-          proxy.getTxMgr().setTXState(null);
-          commit();
-          saveTXCommitMessageForClientFailover();
-        } catch (CommitConflictException error) {
-          Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
-              + " afterCompletion failed.due to CommitConflictException: " + error);
-        }
+    try {
+      switch (status) {
+        case Status.STATUS_COMMITTED:
+          Assert.assertTrue(this.locks != null,
+              "Gemfire Transaction afterCompletion called with illegal state.");
+          try {
+            commit();
+            saveTXCommitMessageForClientFailover();
+          } catch (CommitConflictException error) {
+            Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
+                + " afterCompletion failed.due to CommitConflictException: " + error);
+          }
 
-        this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
-        this.locks = null;
-        // System.err.println("end commit in afterCompletion");
-        break;
-      case Status.STATUS_ROLLEDBACK:
-        this.jtaLifeTime = opStart - getBeginTime();
-        this.proxy.getTxMgr().setTXState(null);
-        rollback();
-        saveTXCommitMessageForClientFailover();
-        this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
-        break;
-      default:
-        Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+          this.proxy.getTxMgr().noteCommitSuccess(opStart, this.jtaLifeTime, this);
+          this.locks = null;
+          break;
+        case Status.STATUS_ROLLEDBACK:
+          this.jtaLifeTime = opStart - getBeginTime();
+          rollback();
+          saveTXCommitMessageForClientFailover();
+          this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
+          break;
+        default:
+          Assert.assertTrue(false, "Unknown JTA Synchronization status " + status);
+      }
+    } catch (RuntimeException exception) {
+      LogService.getLogger().info("got exception " + exception);
+      afterCompletionException = exception;
+    } catch (InternalGemFireError error) {
+      TransactionException exception = new TransactionException(error);
+      afterCompletionException = exception;
     }
-    // System.err.println("end afterCompletion");
+
   }
 
-  private void saveTXCommitMessageForClientFailover() {
+  void saveTXCommitMessageForClientFailover() {
     proxy.getTxMgr().saveTXStateForClientFailover(proxy);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
index f037a02..7a79914 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxy.java
@@ -54,15 +54,6 @@ public interface TXStateProxy extends TXStateInterface {
   void setJCATransaction();
 
   /**
-   * establishes the synchronization thread used for client/server beforeCompletion/afterCompletion
-   * processing
-   *
-   */
-  void setSynchronizationRunnable(TXSynchronizationRunnable sync);
-
-  TXSynchronizationRunnable getSynchronizationRunnable();
-
-  /**
    * Perform additional tasks required by the proxy to suspend a transaction
    */
   void suspend();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index f9aa2d4..00f15c3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -57,12 +57,6 @@ public class TXStateProxyImpl implements TXStateProxy {
   private boolean commitRequestedByOwner;
   private boolean isJCATransaction;
 
-  /**
-   * for client/server JTA transactions we need to have a single thread handle both beforeCompletion
-   * and afterCompletion so that beforeC can obtain locks for the afterC step. This is that thread
-   */
-  protected volatile TXSynchronizationRunnable synchRunnable;
-
   private final ReentrantLock lock = new ReentrantLock();
 
   /** number of operations in this transaction */
@@ -99,16 +93,6 @@ public class TXStateProxyImpl implements TXStateProxy {
   }
 
   @Override
-  public void setSynchronizationRunnable(TXSynchronizationRunnable synch) {
-    this.synchRunnable = synch;
-  }
-
-  @Override
-  public TXSynchronizationRunnable getSynchronizationRunnable() {
-    return this.synchRunnable;
-  }
-
-  @Override
   public ReentrantLock getLock() {
     return this.lock;
   }
@@ -230,9 +214,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       throw e;
     } finally {
       inProgress = preserveTx;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -410,9 +391,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       getRealDeal(null, null).rollback();
     } finally {
       inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
@@ -472,9 +450,6 @@ public class TXStateProxyImpl implements TXStateProxy {
       getRealDeal(null, null).afterCompletion(status);
     } finally {
       this.inProgress = false;
-      if (this.synchRunnable != null) {
-        this.synchRunnable.abort();
-      }
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
similarity index 86%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
index 4603d93..415bcdd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXSynchronizationRunnable.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnable.java
@@ -17,22 +17,20 @@ package org.apache.geode.internal.cache;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
 import org.apache.geode.internal.logging.LogService;
 
 /**
- * TXSynchronizationThread manages beforeCompletion and afterCompletion calls on behalf of a client
- * cache. The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
+ * TXSynchronizationThread manages beforeCompletion and afterCompletion calls in TXState.
+ * The thread should be instantiated with a Runnable that invokes beforeCompletion behavior.
  * Then you must invoke runSecondRunnable() with another Runnable that invokes afterCompletion
  * behavior.
  *
- * @since GemFire 6.6
+ * @since Geode 1.8.0
  */
-public class TXSynchronizationRunnable implements Runnable {
+public class TXStateSynchronizationRunnable implements Runnable {
   private static final Logger logger = LogService.getLogger();
 
   private final CancelCriterion cancelCriterion;
-  private final CommBufferPool commBufferPool;
 
   private Runnable firstRunnable;
   private final Object firstRunnableSync = new Object();
@@ -44,21 +42,15 @@ public class TXSynchronizationRunnable implements Runnable {
 
   private boolean abort;
 
-  public TXSynchronizationRunnable(final CancelCriterion cancelCriterion,
-      final CommBufferPool commBufferPool, final Runnable beforeCompletion) {
+  public TXStateSynchronizationRunnable(final CancelCriterion cancelCriterion,
+      final Runnable beforeCompletion) {
     this.cancelCriterion = cancelCriterion;
-    this.commBufferPool = commBufferPool;
     this.firstRunnable = beforeCompletion;
   }
 
   @Override
   public void run() {
-    commBufferPool.setTLCommBuffer();
-    try {
-      doSynchronizationOps();
-    } finally {
-      commBufferPool.releaseTLCommBuffer();
-    }
+    doSynchronizationOps();
   }
 
   private void doSynchronizationOps() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index fd9c17f..037702a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -16,19 +16,14 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
-import java.util.concurrent.Executor;
-
-import javax.transaction.Status;
 
 import org.apache.geode.cache.client.internal.TXSynchronizationOp.CompletionType;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXCommitMessage;
 import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
-import org.apache.geode.internal.cache.TXSynchronizationRunnable;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -50,21 +45,6 @@ public class TXSynchronizationCommand extends BaseCommand {
    * (non-Javadoc)
    *
    * @see
-   * org.apache.geode.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(org.apache.geode
-   * .internal.cache.tier.sockets.Message,
-   * org.apache.geode.internal.cache.tier.sockets.ServerConnection)
-   */
-  @Override
-  protected boolean shouldMasqueradeForTx(Message clientMessage,
-      ServerConnection serverConnection) {
-    // masquerading is done in the waiting thread pool
-    return false;
-  }
-
-  /*
-   * (non-Javadoc)
-   *
-   * @see
    * org.apache.geode.internal.cache.tier.sockets.BaseCommand#cmdExecute(org.apache.geode.internal.
    * cache.tier.sockets.Message, org.apache.geode.internal.cache.tier.sockets.ServerConnection,
    * long)
@@ -86,20 +66,18 @@ public class TXSynchronizationCommand extends BaseCommand {
       statusPart = null;
     }
 
-    final TXManagerImpl txMgr =
-        (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
-    final InternalDistributedMember member =
-        (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+    final TXManagerImpl txMgr = getTXManager(serverConnection);
+    final InternalDistributedMember member = getDistributedMember(serverConnection);
 
-    // get the tx state without associating it with this thread. That's done later
-    final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
+    final TXStateProxy txProxy = txMgr.getTXState();
+    assert txProxy != null;
 
     final TXId txId = txProxy.getTxId();
     TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     if (commitMessage != null && commitMessage != TXCommitMessage.ROLLBACK_MSG) {
       assert type == CompletionType.AFTER_COMPLETION;
       try {
-        CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+        writeCommitResponse(clientMessage, serverConnection, commitMessage);
       } catch (IOException e) {
         if (isDebugEnabled) {
           logger.debug("Problem writing reply to client", e);
@@ -117,137 +95,92 @@ public class TXSynchronizationCommand extends BaseCommand {
       return;
     }
 
-    // we have to run beforeCompletion and afterCompletion in the same thread
-    // because beforeCompletion obtains locks for the thread and afterCompletion
-    // releases them
-    if (txProxy != null) {
-      try {
-        if (type == CompletionType.BEFORE_COMPLETION) {
-          Runnable beforeCompletion = new Runnable() {
-            @SuppressWarnings("synthetic-access")
-            public void run() {
-              TXStateProxy txState = null;
-              Throwable failureException = null;
-              try {
-                txState = txMgr.masqueradeAs(clientMessage, member, false);
-                if (isDebugEnabled) {
-                  logger.debug("Executing beforeCompletion() notification for transaction {}",
-                      clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.beforeCompletion();
-                try {
-                  writeReply(clientMessage, serverConnection);
-                } catch (IOException e) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", e);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (ReplyException e) {
-                failureException = e.getCause();
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-              } catch (Exception e) {
-                failureException = e;
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
-              if (failureException != null) {
-                try {
-                  writeException(clientMessage, failureException, false, serverConnection);
-                } catch (IOException ioe) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", ioe);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              }
+    try {
+      if (type == CompletionType.BEFORE_COMPLETION) {
+        if (isDebugEnabled) {
+          logger.debug("Executing beforeCompletion() notification for transaction {}",
+              clientMessage.getTransactionId());
+        }
+        Throwable failureException = null;
+        try {
+          txProxy.setIsJTA(true);
+          txProxy.beforeCompletion();
+          try {
+            writeReply(clientMessage, serverConnection);
+          } catch (IOException e) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", e);
+            }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        } catch (ReplyException e) {
+          failureException = e.getCause();
+        } catch (Exception e) {
+          failureException = e;
+        }
+        if (failureException != null) {
+          try {
+            writeException(clientMessage, failureException, false, serverConnection);
+          } catch (IOException ioe) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", ioe);
             }
-          };
-          TXSynchronizationRunnable sync =
-              new TXSynchronizationRunnable(serverConnection.getCache().getCancelCriterion(),
-                  serverConnection.getAcceptor(), beforeCompletion);
-          txProxy.setSynchronizationRunnable(sync);
-          Executor exec = InternalDistributedSystem.getConnectedInstance().getDistributionManager()
-              .getWaitingThreadPool();
-          exec.execute(sync);
-          sync.waitForFirstExecution();
-        } else {
-          Runnable afterCompletion = new Runnable() {
-            @SuppressWarnings("synthetic-access")
-            public void run() {
-              TXStateProxy txState = null;
-              try {
-                txState = txMgr.masqueradeAs(clientMessage, member, false);
-                int status = statusPart.getInt();
-                if (isDebugEnabled) {
-                  logger.debug("Executing afterCompletion({}) notification for transaction {}",
-                      status, clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.afterCompletion(status);
-                // GemFire commits during afterCompletion - send the commit info back to the client
-                // where it can be applied to the local cache
-                TXCommitMessage cmsg = txState.getCommitMessage();
-                try {
-                  CommitCommand.writeCommitResponse(cmsg, clientMessage, serverConnection);
-                  txMgr.removeHostedTXState(txState.getTxId());
-                } catch (IOException e) {
-                  // not much can be done here
-                  if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
-                    logger.warn("Problem writing reply to client", e);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (RuntimeException e) {
-                try {
-                  writeException(clientMessage, e, false, serverConnection);
-                } catch (IOException ioe) {
-                  if (isDebugEnabled) {
-                    logger.debug("Problem writing reply to client", ioe);
-                  }
-                }
-                serverConnection.setAsTrue(RESPONDED);
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        }
+      } else {
+        try {
+          int status = statusPart.getInt();
+          if (isDebugEnabled) {
+            logger.debug("Executing afterCompletion({}) notification for transaction {}",
+                status, clientMessage.getTransactionId());
+          }
+          txProxy.setIsJTA(true);
+          txProxy.setCommitOnBehalfOfRemoteStub(true);
+          txProxy.afterCompletion(status);
+          // GemFire commits during afterCompletion - send the commit info back to the client
+          // where it can be applied to the local cache
+          TXCommitMessage cmsg = txProxy.getCommitMessage();
+          try {
+            writeCommitResponse(clientMessage, serverConnection, cmsg);
+            txMgr.removeHostedTXState(txProxy.getTxId());
+          } catch (IOException e) {
+            // not much can be done here
+            if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
+              logger.warn("Problem writing reply to client", e);
             }
-          };
-          // if there was a beforeCompletion call then there will be a thread
-          // sitting in the waiting pool to execute afterCompletion. Otherwise
-          // we have failed-over and may need to do beforeCompletion & hope that it works
-          TXSynchronizationRunnable sync = txProxy.getSynchronizationRunnable();
-          if (sync != null) {
-            sync.runSecondRunnable(afterCompletion);
-          } else {
-            if (statusPart.getInt() == Status.STATUS_COMMITTED) {
-              TXStateProxy txState = txMgr.masqueradeAs(clientMessage, member, false);
-              try {
-                if (isDebugEnabled) {
-                  logger.debug(
-                      "Executing beforeCompletion() notification for transaction {} after failover",
-                      clientMessage.getTransactionId());
-                }
-                txState.setIsJTA(true);
-                txState.beforeCompletion();
-              } finally {
-                txMgr.unmasquerade(txState);
-              }
+          }
+          serverConnection.setAsTrue(RESPONDED);
+        } catch (RuntimeException e) {
+          try {
+            writeException(clientMessage, e, false, serverConnection);
+          } catch (IOException ioe) {
+            if (isDebugEnabled) {
+              logger.debug("Problem writing reply to client", ioe);
             }
-            afterCompletion.run();
           }
+          serverConnection.setAsTrue(RESPONDED);
         }
-      } catch (Exception e) {
-        writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection);
-        serverConnection.setAsTrue(RESPONDED);
-      }
-      if (isDebugEnabled) {
-        logger.debug("Sent tx synchronization response");
       }
+    } catch (Exception e) {
+      writeException(clientMessage, MessageType.EXCEPTION, e, false, serverConnection);
+      serverConnection.setAsTrue(RESPONDED);
+    }
+    if (isDebugEnabled) {
+      logger.debug("Sent tx synchronization response");
     }
   }
 
+  void writeCommitResponse(Message clientMessage, ServerConnection serverConnection,
+      TXCommitMessage commitMessage) throws IOException {
+    CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+  }
+
+  InternalDistributedMember getDistributedMember(ServerConnection serverConnection) {
+    return (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+  }
+
+  TXManagerImpl getTXManager(ServerConnection serverConnection) {
+    return (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
index c97d9f0..4469b71 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/ClientTXStateStub.java
@@ -322,4 +322,8 @@ public class ClientTXStateStub extends TXStateStub {
   public void setAfterLocalLocks(Runnable afterLocalLocks) {
     this.internalAfterLocalLocks = afterLocalLocks;
   }
+
+  public ServerLocation getServerAffinityLocation() {
+    return serverAffinityLocation;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
similarity index 52%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
index a6ba2f5..05b811b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXSynchronizationRunnableTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateSynchronizationRunnableTest.java
@@ -18,6 +18,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -25,40 +28,63 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.internal.cache.tier.sockets.CommBufferPool;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
-public class TXSynchronizationRunnableTest {
-
+public class TXStateSynchronizationRunnableTest {
   private CancelCriterion cancelCriterion;
-  private CommBufferPool commBufferPool;
   private Runnable beforeCompletion;
+  private Runnable afterCompletion;
   private CacheClosedException exception;
 
-
   @Before
   public void setUp() {
     exception = new CacheClosedException();
 
     cancelCriterion = mock(CancelCriterion.class);
-    commBufferPool = mock(CommBufferPool.class);
     beforeCompletion = mock(Runnable.class);
+    afterCompletion = mock(Runnable.class);
+  }
 
+  @Test
+  public void waitForFirstExecutionThrowsExceptionIfCacheClosed() {
     doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
+    TXStateSynchronizationRunnable runnable =
+        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+    assertThatThrownBy(() -> runnable.waitForFirstExecution()).isSameAs(exception);
   }
 
   @Test
-  public void test() {
-    TXSynchronizationRunnable runnable =
-        new TXSynchronizationRunnable(cancelCriterion, commBufferPool, beforeCompletion);
-    assertThatThrownBy(() -> runnable.waitForFirstExecution()).isSameAs(exception);
+  public void runSecondRunnableThrowsExceptionIfCacheClosed() {
+    doThrow(exception).when(cancelCriterion).checkCancelInProgress(any());
+    TXStateSynchronizationRunnable runnable =
+        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+    assertThatThrownBy(() -> runnable.runSecondRunnable(afterCompletion)).isSameAs(exception);
   }
 
   @Test
-  public void test1() {
-    TXSynchronizationRunnable runnable =
-        new TXSynchronizationRunnable(cancelCriterion, commBufferPool, beforeCompletion);
-    assertThatThrownBy(() -> runnable.runSecondRunnable(mock(Runnable.class))).isSameAs(exception);
+  public void doSynchronizationOpsWaitsUntilRunSecondRunnable() {
+    TXStateSynchronizationRunnable runnable =
+        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+    new Thread(() -> {
+      runnable.runSecondRunnable(afterCompletion);
+    }).start();
+    runnable.run();
+    verify(beforeCompletion, times(1)).run();
+    verify(afterCompletion, times(1)).run();
   }
+
+  @Test
+  public void doSynchronizationOpsDoesNotRunSecondRunnableIfAborted() {
+    TXStateSynchronizationRunnable runnable =
+        new TXStateSynchronizationRunnable(cancelCriterion, beforeCompletion);
+    runnable.abort();
+    new Thread(() -> {
+      runnable.runSecondRunnable(afterCompletion);
+    }).start();
+    runnable.run();
+    verify(beforeCompletion, times(1)).run();
+    verify(afterCompletion, never()).run();
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
new file mode 100644
index 0000000..0dce944
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.Executor;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.SynchronizationCommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TXStateTest {
+  private CancelCriterion cancelCriterion;
+
+  private TXStateProxyImpl txStateProxy;
+  private CommitConflictException exception;
+  private TXStateSynchronizationRunnable txStateSynch;
+  private SynchronizationCommitConflictException synchronizationCommitConflictException;
+  private RuntimeException runtimeException;
+  private TransactionDataNodeHasDepartedException transactionDataNodeHasDepartedException;
+
+  @Before
+  public void setup() {
+    txStateProxy = mock(TXStateProxyImpl.class);
+
+    cancelCriterion = mock(CancelCriterion.class);
+    exception = new CommitConflictException("");
+    txStateSynch = mock(TXStateSynchronizationRunnable.class);
+    synchronizationCommitConflictException = new SynchronizationCommitConflictException("");
+    runtimeException = new RuntimeException();
+    transactionDataNodeHasDepartedException = new TransactionDataNodeHasDepartedException("");
+
+    when(txStateProxy.getTxMgr()).thenReturn(mock(TXManagerImpl.class));
+  }
+
+  @Test
+  public void beforeCompletionThrowsSynchronizationCommitConflictExceptionIfBeforeCompletionExceptionIsSet() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+
+    doReturn(mock(Executor.class)).when(txState).getExecutor();
+    doReturn(txStateSynch).when(txState).createTxStateSynchronizationRunnable();
+    doReturn(synchronizationCommitConflictException).when(txState).getBeforeCompletionException();
+
+    assertThatThrownBy(() -> txState.beforeCompletion())
+        .isSameAs(synchronizationCommitConflictException);
+  }
+
+  @Test
+  public void beforeCompletionExceptionIsSetWhenDoBeforeCompletionCouldNotLockKeys() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doThrow(exception).when(txState).reserveAndCheck();
+
+    txState.doBeforeCompletion();
+    assertThat(txState.getBeforeCompletionException())
+        .isInstanceOf(SynchronizationCommitConflictException.class);
+  }
+
+
+  @Test
+  public void afterCompletionThrowsExceptionIfAfterCompletionExceptionIsSet() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+
+    doReturn(txStateSynch).when(txState).getSynchronizationRunnable();
+    doReturn(runtimeException).when(txState).getAfterCompletionException();
+
+    assertThatThrownBy(() -> txState.afterCompletion(Status.STATUS_COMMITTED))
+        .isSameAs(runtimeException);
+  }
+
+  @Test
+  public void afterCompletionExceptionIsSetWhenCommitFailedWithTransactionDataNodeHasDepartedException() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    txState.reserveAndCheck();
+    doThrow(transactionDataNodeHasDepartedException).when(txState).commit();
+
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+    assertThat(txState.getAfterCompletionException())
+        .isSameAs(transactionDataNodeHasDepartedException);
+  }
+
+  @Test
+  public void afterCompletionExceptionIsSetToTransactionExceptionWhenCommitFailedWithCommitConflictException() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    txState.reserveAndCheck();
+    doThrow(exception).when(txState).commit();
+
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+
+    assertThat(txState.getAfterCompletionException()).isInstanceOf(TransactionException.class);
+    TransactionException transactionException =
+        (TransactionException) txState.getAfterCompletionException();
+    assertThat(transactionException.getCause()).isInstanceOf(InternalGemFireError.class);
+  }
+
+
+  @Test
+  public void afterCompletionCanCommitJTA() {
+    TXState txState = spy(new TXState(txStateProxy, false));
+    doReturn(mock(InternalCache.class)).when(txState).getCache();
+    txState.reserveAndCheck();
+    txState.closed = true;
+    txState.doAfterCompletion(Status.STATUS_COMMITTED);
+
+    assertThat(txState.locks).isNull();
+    verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+  }
+
+  @Test
+  public void afterCompletionCanRollbackJTA() {
+    TXState txState = spy(new TXState(txStateProxy, true));
+    txState.afterCompletion(Status.STATUS_ROLLEDBACK);
+
+    verify(txState, times(1)).rollback();
+    verify(txState, times(1)).saveTXCommitMessageForClientFailover();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
new file mode 100644
index 0000000..adac845
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommandTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import javax.transaction.Status;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.client.internal.TXSynchronizationOp;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class TXSynchronizationCommandTest {
+  private Message clientMessage;
+  private ServerConnection serverConnection;
+  private TXManagerImpl txManager;
+  private TXStateProxyImpl txStateProxy;
+  private TXId txId;
+  private TXCommitMessage txCommitMessage;
+  private InternalDistributedMember member;
+  private Part part0;
+  private Part part1;
+  private Part part2;
+  private RuntimeException exception;
+  private TXSynchronizationCommand command;
+
+  @Before
+  public void setup() {
+    clientMessage = mock(Message.class);
+    serverConnection = mock(ServerConnection.class);
+    txManager = mock(TXManagerImpl.class);
+    member = mock(InternalDistributedMember.class);
+    txStateProxy = mock(TXStateProxyImpl.class);
+    txId = mock(TXId.class);
+    txCommitMessage = mock(TXCommitMessage.class);
+    part0 = mock(Part.class);
+    part1 = mock(Part.class);
+    part2 = mock(Part.class);
+    exception = new RuntimeException();
+    command = mock(TXSynchronizationCommand.class);
+
+    when(clientMessage.getPart(0)).thenReturn(part0);
+    when(clientMessage.getPart(1)).thenReturn(part1);
+    when(clientMessage.getPart(2)).thenReturn(part2);
+    doReturn(txManager).when(command).getTXManager(serverConnection);
+    doReturn(member).when(command).getDistributedMember(serverConnection);
+    when(txManager.getTXState()).thenReturn(txStateProxy);
+    when(txStateProxy.getTxId()).thenReturn(txId);
+  }
+
+  @Test
+  public void commandCanSendBackCommitMessageIfAlreadyCommitted() throws Exception {
+    when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+    when(txManager.getRecentlyCompletedMessage(txId)).thenReturn(txCommitMessage);
+    doNothing().when(command).writeCommitResponse(clientMessage, serverConnection, txCommitMessage);
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(command, times(1)).writeCommitResponse(clientMessage, serverConnection, txCommitMessage);
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+
+  @Test
+  public void commandCanInvokeBeforeCompletion() throws Exception {
+    when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.BEFORE_COMPLETION.ordinal());
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(txStateProxy, times(1)).beforeCompletion();
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+
+  @Test
+  public void commandCanSendBackCommitMessageAfterInvokeAfterCompletion() throws Exception {
+    when(part0.getInt()).thenReturn(TXSynchronizationOp.CompletionType.AFTER_COMPLETION.ordinal());
+    when(part2.getInt()).thenReturn(Status.STATUS_COMMITTED);
+    when(txStateProxy.getCommitMessage()).thenReturn(txCommitMessage);
+
+    doCallRealMethod().when(command).cmdExecute(clientMessage, serverConnection, null, 1);
+    command.cmdExecute(clientMessage, serverConnection, null, 1);
+
+    verify(txStateProxy, times(1)).afterCompletion(Status.STATUS_COMMITTED);
+    verify(command, times(1)).writeCommitResponse(clientMessage, serverConnection, txCommitMessage);
+    verify(serverConnection, times(1)).setAsTrue(Command.RESPONDED);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java
new file mode 100644
index 0000000..43ab209
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/functional/ClientServerJTAFailoverDistributedTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.jta.functional;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Status;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tx.ClientTXStateStub;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class ClientServerJTAFailoverDistributedTest implements Serializable {
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private VM server1;
+  private VM server2;
+  private VM server3;
+  private VM client1;
+  private int port1;
+  private int port2;
+
+  private final int key = 1;
+  private final String value = "value1";
+  private final String newValue = "value2";
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() {
+    server1 = getVM(0);
+    server2 = getVM(1);
+    server3 = getVM(2);
+    client1 = getVM(3);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+  }
+
+  @Test
+  public void jtaCanFailoverAfterDoneBeforeCompletion() {
+    server3.invoke(() -> createServerRegion(1, false));
+    server3.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    int port = (Integer) beforeCompletionResults[1];
+
+    if (port == port1) {
+      server1.invoke(() -> cacheRule.getCache().close());
+    } else {
+      assert port == port2;
+      server2.invoke(() -> cacheRule.getCache().close());
+    }
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+  }
+
+  private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    factory.setTotalNumBuckets(totalNumBuckets);
+    if (isAccessor) {
+      factory.setLocalMaxMemory(0);
+    }
+    PartitionAttributes partitionAttributes = factory.create();
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(partitionAttributes).create(regionName);
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientRegion(int... ports) {
+    clientCacheRule.createClientCache();
+
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    PoolImpl pool;
+    try {
+      pool = getPool(ports);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    ClientRegionFactory crf =
+        clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+
+    if (ports.length > 1) {
+      pool.acquireConnection(new ServerLocation(hostName, port1));
+    }
+  }
+
+  private PoolImpl getPool(int... ports) {
+    PoolFactory factory = PoolManager.createFactory();
+    for (int port : ports) {
+      factory.addServer(hostName, port);
+    }
+    return (PoolImpl) factory.setReadTimeout(2000).setSocketBufferSize(1000)
+        .setMinConnections(4).create(uniqueName);
+  }
+
+  private void doPut(int key, String value) {
+    cacheRule.getCache().getRegion(regionName).put(key, value);
+  }
+
+  private Object[] doBeforeCompletion() {
+    Object[] results = new Object[2];
+    InternalClientCache cache = clientCacheRule.getClientCache();
+    Region region = cache.getRegion(regionName);
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.begin();
+    region.put(key, newValue);
+
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
+    clientTXStateStub.beforeCompletion();
+    TransactionId transactionId = txManager.suspend();
+    int port = clientTXStateStub.getServerAffinityLocation().getPort();
+    results[0] = transactionId;
+    results[1] = port;
+    return results;
+  }
+
+  private void doAfterCompletion(TransactionId transactionId, boolean isCommit) {
+    InternalClientCache cache = clientCacheRule.getClientCache();
+    Region region = cache.getRegion(regionName);
+    TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    txManager.resume(transactionId);
+
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    ClientTXStateStub clientTXStateStub = (ClientTXStateStub) txStateProxy.getRealDeal(null, null);
+    try {
+      clientTXStateStub
+          .afterCompletion(isCommit ? Status.STATUS_COMMITTED : Status.STATUS_ROLLEDBACK);
+    } catch (Exception exception) {
+      LogService.getLogger().info("exception stack ", exception);
+      throw exception;
+    }
+    if (isCommit) {
+      assertEquals(newValue, region.get(key));
+    } else {
+      assertEquals(value, region.get(key));
+    }
+  }
+
+  @Test
+  public void jtaCanFailoverToJTAHostAfterDoneBeforeCompletion() {
+    port2 = server2.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    server1.invoke(() -> cacheRule.getCache().close());
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], true));
+  }
+
+  @Test
+  public void jtaCanFailoverWithRollbackAfterDoneBeforeCompletion() {
+    server3.invoke(() -> createServerRegion(1, false));
+    server3.invoke(() -> doPut(key, value));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+
+    client1.invoke(() -> createClientRegion(port1, port2));
+
+    Object[] beforeCompletionResults = client1.invoke(() -> doBeforeCompletion());
+
+    int port = (Integer) beforeCompletionResults[1];
+
+    if (port == port1) {
+      server1.invoke(() -> cacheRule.getCache().close());
+    } else {
+      assert port == port2;
+      server2.invoke(() -> cacheRule.getCache().close());
+    }
+
+    client1.invoke(() -> doAfterCompletion((TransactionId) beforeCompletionResults[0], false));
+
+    createClientRegion(port == port1 ? port2 : port1);
+    doPutTransaction(true);
+  }
+
+  private void doPutTransaction(boolean isClient) {
+    Region region;
+    TXManagerImpl txManager;
+    if (isClient) {
+      InternalClientCache cache = clientCacheRule.getClientCache();
+      region = cache.getRegion(regionName);
+      txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+    } else {
+      InternalCache cache = cacheRule.getCache();
+      region = cache.getRegion(regionName);
+      txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+
+      Awaitility.await().atMost(30, TimeUnit.SECONDS).pollInterval(10, TimeUnit.MILLISECONDS)
+          .until(() -> txManager.isHostedTXStatesEmpty());
+    }
+    txManager.begin();
+    region.put(key, newValue);
+    txManager.commit();
+    assertEquals(newValue, region.get(key));
+  }
+
+}


Mime
View raw message