geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject [geode] branch develop updated: GEODE-5269 CommitConflictException after TransactionInDoubtException
Date Fri, 01 Jun 2018 20:27:06 GMT
This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f50b945  GEODE-5269 CommitConflictException after TransactionInDoubtException
f50b945 is described below

commit f50b9450b918ad4409c5a6ebf374b88de8c75986
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
AuthorDate: Fri Jun 1 13:26:05 2018 -0700

    GEODE-5269 CommitConflictException after TransactionInDoubtException
    
    Before sending a client a TransactionInDoubtException that is caused by
    a server shutting down we now wait a bit for the server to finish shutting
    down.  This allows any locks it held to be released and avoids a
    CommitConflictException if the client should immediately try another
    transaction with the same key(s).
---
 .../internal/ClusterDistributionManager.java       |  2 +-
 .../distributed/internal/DistributionManager.java  |  7 ++-
 .../internal/LonerDistributionManager.java         |  2 +-
 .../distributed/internal/direct/DirectChannel.java | 11 +++--
 .../geode/internal/cache/PeerTXStateStub.java      | 17 ++++---
 .../cache/tier/sockets/command/CommitCommand.java  | 50 +++++++++++++++++--
 .../tier/sockets/command/CommitCommandTest.java    | 57 ++++++++++++++++++++++
 .../dunit/cache/internal/JUnit4CacheTestCase.java  |  5 +-
 8 files changed, 128 insertions(+), 23 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 8b5d82c..a513627 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -2856,7 +2856,7 @@ public class ClusterDistributionManager implements DistributionManager
{
   }
 
   @Override
-  public boolean isCurrentMember(InternalDistributedMember id) {
+  public boolean isCurrentMember(DistributedMember id) {
     Set m;
     synchronized (this.membersLock) {
       // access to members synchronized under membersLock in order to
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index 167f8ba..8bb84ae 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -306,8 +306,11 @@ public interface DistributionManager extends ReplySender {
   /** Returns a set of all roles currently in the distributed system. */
   Set getAllRoles();
 
-  /** Returns true if id is a current member of the distributed system */
-  boolean isCurrentMember(InternalDistributedMember id);
+  /**
+   * Returns true if id is a current member of the distributed system
+   *
+   */
+  boolean isCurrentMember(DistributedMember id);
 
   /**
    * Remove given member from list of members who are pending a startup reply
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index f74c34d..d6a6cd1 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -1256,7 +1256,7 @@ public class LonerDistributionManager implements DistributionManager
{
     this.getId().setPort(this.lonerPort);
   }
 
-  public boolean isCurrentMember(InternalDistributedMember p_id) {
+  public boolean isCurrentMember(DistributedMember p_id) {
     return getId().equals(p_id);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index c766d26..970957f 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -20,6 +20,7 @@ import java.io.NotSerializableException;
 import java.net.InetAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -314,6 +315,10 @@ public class DirectChannel {
     if (!directReply && directMsg != null) {
       directMsg.registerProcessor();
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Sending ({}) to {} peers ({}) via tcp/ip",
+          msg, p_destinations.length, Arrays.toString(p_destinations));
+    }
 
     try {
       do {
@@ -375,9 +380,9 @@ public class DirectChannel {
         }
 
         try {
-          if (logger.isDebugEnabled()) {
-            logger.debug("{}{}) to {} peers ({}) via tcp/ip",
-                (retry ? "Retrying send (" : "Sending ("), msg, cons.size(), cons);
+          if (retry && logger.isDebugEnabled()) {
+            logger.debug("Retrying send ({}{}) to {} peers ({}) via tcp/ip",
+                msg, cons.size(), cons);
           }
           DMStats stats = getDMStats();
           List<?> sentCons; // used for cons we sent to this time
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 6211b6a..670bbb4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -131,14 +132,14 @@ public class PeerTXStateStub extends TXStateStub {
       }
     } catch (Exception e) {
       this.getCache().getCancelCriterion().checkCancelInProgress(e);
-      if (e.getCause() != null) {
-        if (e.getCause() instanceof ForceReattemptException) {
-          Throwable e2 = e.getCause();
-          if (e2.getCause() != null && e2.getCause() instanceof PrimaryBucketException)
{
+      Throwable eCause = e.getCause();
+      if (eCause != null) {
+        if (eCause instanceof ForceReattemptException) {
+          if (eCause.getCause() instanceof PrimaryBucketException) {
             // data rebalanced
             TransactionDataRebalancedException tdnce =
-                new TransactionDataRebalancedException(e2.getCause().getMessage());
-            tdnce.initCause(e2.getCause());
+                new TransactionDataRebalancedException(eCause.getCause().getMessage());
+            tdnce.initCause(eCause.getCause());
             throw tdnce;
           } else {
             // We cannot be sure that the member departed starting to process commit request,
@@ -146,11 +147,11 @@ public class PeerTXStateStub extends TXStateStub {
             // fixes 44939
             TransactionInDoubtException tdnce =
                 new TransactionInDoubtException(e.getCause().getMessage());
-            tdnce.initCause(e.getCause());
+            tdnce.initCause(eCause);
             throw tdnce;
           }
         }
-        throw new TransactionInDoubtException(e.getCause());
+        throw new TransactionInDoubtException(eCause);
       } else {
         throw new TransactionInDoubtException(e);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index 936e9fb..efa085e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -15,7 +15,11 @@
 package org.apache.geode.internal.cache.tier.sockets.command;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.geode.CancelException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.TXCommitMessage;
@@ -47,6 +51,8 @@ public class CommitCommand extends BaseCommand {
   @Override
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start) throws IOException {
+
+
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
     TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
     InternalDistributedMember client =
@@ -74,18 +80,27 @@ public class CommitCommand extends BaseCommand {
     if (logger.isDebugEnabled()) {
       logger.debug("TX: committing client tx: {}", txId);
     }
-    try {
-
-      txId = txProxy.getTxId();
+    commitTransaction(clientMessage, serverConnection, txMgr, wasInProgress,
+        txProxy);
+  }
 
+  protected void commitTransaction(Message clientMessage, ServerConnection serverConnection,
+      TXManagerImpl txMgr,
+      boolean wasInProgress, TXStateProxy txProxy) throws IOException {
+    Exception txException = null;
+    TXCommitMessage commitMsg = null;
+    TXId txId = txProxy.getTxId();
+    try {
       txProxy.setCommitOnBehalfOfRemoteStub(true);
       txMgr.commit();
 
       commitMsg = txProxy.getCommitMessage();
+      logger.debug("Sending commit response to client: {}", commitMsg);
       writeCommitResponse(commitMsg, clientMessage, serverConnection);
       serverConnection.setAsTrue(RESPONDED);
+
     } catch (Exception e) {
-      sendException(clientMessage, serverConnection, e);
+      txException = e;
     } finally {
       if (txId != null) {
         txMgr.removeHostedTXState(txId);
@@ -97,10 +112,35 @@ public class CommitCommand extends BaseCommand {
         commitMsg.setClientVersion(null); // fixes bug 46529
       }
     }
+    if (txException != null) {
+      DistributedMember target = txProxy.getTarget();
+      // a TransactionInDoubtException caused by the TX host shutting down means that
+      // the transaction may still be active and hold locks. We must wait for the transaction
+      // host to finish shutting down before responding to the client or it could encounter
+      // conflicts in retrying the transaction
+      if ((txException instanceof TransactionInDoubtException)
+          && (txException.getCause() instanceof CancelException)) {
+        logger.info(
+            "Waiting for departure of {} before throwing TransactionInDoubtException.",
+            target);
+        try {
+          serverConnection.getCache().getDistributionManager().getMembershipManager()
+              .waitForDeparture(target);
+        } catch (TimeoutException e) {
+          // status will be logged below
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        logger.info("Done waiting.  Transaction host {} in the cluster.",
+            serverConnection.getCache().getDistributionManager().isCurrentMember(target)
+                ? "is still"
+                : "is no longer");
+      }
+      sendException(clientMessage, serverConnection, txException);
+    }
   }
 
 
-
   protected static void writeCommitResponse(TXCommitMessage response, Message origMsg,
       ServerConnection servConn) throws IOException {
     Message responseMsg = servConn.getResponseMessage();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
index 4da081d..08f1425 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -14,14 +14,30 @@
  */
 package org.apache.geode.internal.cache.tier.sockets.command;
 
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.net.InetAddress;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -50,4 +66,45 @@ public class CommitCommandTest {
 
     CommitCommand.writeCommitResponse(null, origMsg, servConn);
   }
+
+  /**
+   * GEODE-5269 CommitConflictException after TransactionInDoubtException
+   * CommitCommand needs to stall waiting for the host of a transaction to
+   * finish shutting down before sending a TransactionInDoubtException to
+   * the client.
+   */
+  @Test
+  public void testTransactionInDoubtWaitsForTargetDeparture() throws Exception {
+    CommitCommand command = (CommitCommand) CommitCommand.getCommand();
+    Message clientMessage = mock(Message.class);
+    ServerConnection serverConnection = mock(ServerConnection.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    TXStateProxy txProxy = mock(TXStateProxy.class);
+    InternalCache cache = mock(InternalCache.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    MembershipManager membershipManager = mock(MembershipManager.class);
+    boolean wasInProgress = false;
+
+    doReturn(cache).when(serverConnection).getCache();
+    doReturn(distributionManager).when(cache).getDistributionManager();
+    doReturn(membershipManager).when(distributionManager).getMembershipManager();
+    doReturn(false).when(distributionManager).isCurrentMember(isA(
+        InternalDistributedMember.class));
+
+    doReturn(mock(Message.class)).when(serverConnection).getErrorResponseMessage();
+
+    doReturn(new InternalDistributedMember(InetAddress.getLocalHost(), 1234)).when(txProxy)
+        .getTarget();
+
+    TransactionInDoubtException transactionInDoubtException =
+        new TransactionInDoubtException("tx in doubt");
+    transactionInDoubtException.initCause(new CacheClosedException("testing"));
+    doThrow(transactionInDoubtException).when(txMgr).commit();
+
+    command.commitTransaction(
+        clientMessage, serverConnection, txMgr, wasInProgress, txProxy);
+
+    verify(txMgr, atLeastOnce()).commit();
+    verify(membershipManager, times(1)).waitForDeparture(isA(DistributedMember.class));
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
index fa9a6f2..c93f921 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/cache/internal/JUnit4CacheTestCase.java
@@ -41,7 +41,6 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.client.ClientCache;
 import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.PoolManager;
-import org.apache.geode.cache.client.internal.InternalClientCache;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.DistributionMessageObserver;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -263,8 +262,8 @@ public abstract class JUnit4CacheTestCase extends JUnit4DistributedTestCase
     }
   }
 
-  public final InternalClientCache getClientCache() {
-    return (InternalClientCache) cache;
+  public final ClientCache getClientCache() {
+    return (ClientCache) cache;
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
bschuchardt@apache.org.

Mime
View raw message