geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject [geode] branch develop updated: GEODE-4142: save commit message for failover after jta commit (#1324)
Date Fri, 26 Jan 2018 17:44:47 GMT
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 47df59a  GEODE-4142: save commit message for failover after jta commit (#1324)
47df59a is described below

commit 47df59a967bbb30121ea6e996f124cfb8529b098
Author: pivotal-eshu <eshu@pivotal.io>
AuthorDate: Fri Jan 26 09:44:44 2018 -0800

    GEODE-4142: save commit message for failover after jta commit (#1324)
    
    * GEODE-4142: save commit message for failover after jta commit.
    
      check if the jta has completed before retrying the beforeCompeltion and afterCompletion
in failover cases.
---
 .../geode/internal/cache/DistTXCommitMessage.java  | 13 ++--
 .../internal/cache/JtaAfterCompletionMessage.java  | 12 +++-
 .../internal/cache/JtaBeforeCompletionMessage.java |  6 ++
 .../geode/internal/cache/TXCommitMessage.java      |  4 ++
 .../apache/geode/internal/cache/TXManagerImpl.java |  6 +-
 .../internal/cache/TXRemoteCommitMessage.java      | 17 +++--
 .../org/apache/geode/internal/cache/TXState.java   |  7 +-
 .../cache/tier/sockets/command/CommitCommand.java  |  5 +-
 .../sockets/command/TXSynchronizationCommand.java  | 27 +++++++-
 .../cache/JtaAfterCompletionMessageTest.java       | 52 +++++++++++++++
 .../cache/JtaBeforeCompletionMessageTest.java      | 52 +++++++++++++++
 .../internal/jta/ClientServerJTADUnitTest.java     | 77 +++++++++++++++++++---
 12 files changed, 246 insertions(+), 32 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
index bb1f60b..fd31de7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java
@@ -77,17 +77,16 @@ public class DistTXCommitMessage extends TXMessage {
     InternalCache cache = dm.getCache();
     TXManagerImpl txMgr = cache.getTXMgr();
     final TXStateProxy txStateProxy = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "DistTXCommitMessage.operateOnTx: found a previously committed transaction:{}",
txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // [DISTTX] TODO - Handle scenarios of no txState
@@ -126,13 +125,13 @@ public class DistTXCommitMessage extends TXMessage {
 
           txMgr.commit();
 
-          cmsg = txStateProxy.getCommitMessage();
+          commitMessage = txStateProxy.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage, getReplySender(dm));
 
     /*
      * return false so there isn't another reply
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
index fc4eca5..b177cf8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaAfterCompletionMessage.java
@@ -79,11 +79,19 @@ public class JtaAfterCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling afterCompletion for :{}", txId);
     }
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null) {
+      TXCommitMessage message =
+          commitMessage == TXCommitMessage.ROLLBACK_MSG ? null : commitMessage;
+      TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), message, getReplySender(dm));
+      return false;
+    }
     TXStateProxy txState = txMgr.getTXState();
     txState.setCommitOnBehalfOfRemoteStub(true);
     txState.afterCompletion(status);
-    TXCommitMessage cmsg = txState.getCommitMessage();
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    commitMessage = txState.getCommitMessage();
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
     txMgr.removeHostedTXState(txId);
     return false;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
index 13bf946..894a7ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessage.java
@@ -61,6 +61,12 @@ public class JtaBeforeCompletionMessage extends TXMessage {
     if (logger.isDebugEnabled()) {
       logger.debug("JTA: Calling beforeCompletion for :{}", txId);
     }
+    // Check if jta has been completed, possible due to tx failover.
+    // No need to execute beforeCompletion if already completed.
+    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      return true;
+    }
+
     txMgr.getTXState().beforeCompletion();
     return true;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index f32f178..9b4cab2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -144,6 +144,10 @@ public class TXCommitMessage extends PooledDistributionMessage
    * transaction
    */
   public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
+  /**
+   * A token to be put in TXManagerImpl#failoverMap to represent a rolled back transaction
+   */
+  public static final TXCommitMessage ROLLBACK_MSG = new TXCommitMessage();
 
   public TXCommitMessage(TXId txIdent, DistributionManager dm, TXState txState) {
     this.dm = dm;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index c73c18c..53a4657 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -1146,9 +1146,11 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     }
   }
 
-  private void saveTXStateForClientFailover(TXStateProxy tx) {
+  void saveTXStateForClientFailover(TXStateProxy tx) {
     if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
-      failoverMap.put(tx.getTxId(), tx.getCommitMessage());
+      TXCommitMessage commitMessage =
+          tx.getCommitMessage() == null ? TXCommitMessage.ROLLBACK_MSG : tx.getCommitMessage();
+      failoverMap.put(tx.getTxId(), commitMessage);
       if (logger.isDebugEnabled()) {
         logger.debug(
             "TX: storing client initiated transaction:{}; now there are {} entries in the
failoverMap",
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index 6b55610..8417884 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -81,16 +81,15 @@ public class TXRemoteCommitMessage extends TXMessage {
       logger.debug("TX: Committing: {}", txId);
     }
     final TXStateProxy txState = txMgr.getTXState();
-    TXCommitMessage cmsg = null;
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
     try {
       // do the actual commit, only if it was not done before
-      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+      if (commitMessage != null) {
         if (logger.isDebugEnabled()) {
           logger.debug("TX: found a previously committed transaction:{}", txId);
         }
-        cmsg = txMgr.getRecentlyCompletedMessage(txId);
-        if (txMgr.isExceptionToken(cmsg)) {
-          throw txMgr.getExceptionForToken(cmsg, txId);
+        if (txMgr.isExceptionToken(commitMessage)) {
+          throw txMgr.getExceptionForToken(commitMessage, txId);
         }
       } else {
         // if no TXState was created (e.g. due to only getEntry/size operations
@@ -98,13 +97,14 @@ public class TXRemoteCommitMessage extends TXMessage {
         if (txState != null) {
           txState.setCommitOnBehalfOfRemoteStub(true);
           txMgr.commit();
-          cmsg = txState.getCommitMessage();
+          commitMessage = txState.getCommitMessage();
         }
       }
     } finally {
       txMgr.removeHostedTXState(txId);
     }
-    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), cmsg, getReplySender(dm));
+    TXRemoteCommitReplyMessage.send(getSender(), getProcessorId(), commitMessage,
+        getReplySender(dm));
 
     /*
      * return false so there isn't another reply
@@ -172,6 +172,9 @@ public class TXRemoteCommitMessage extends TXMessage {
     public static void send(InternalDistributedMember recipient, int processorId,
         TXCommitMessage val, ReplySender replySender) throws RemoteOperationException {
       Assert.assertTrue(recipient != null, "TXRemoteCommitReply NULL reply message");
+      if (val != null) {
+        val.setClientVersion(null);
+      }
       TXRemoteCommitReplyMessage m = new TXRemoteCommitReplyMessage(processorId, val);
       m.setRecipient(recipient);
       replySender.putOutgoing(m);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index 0fa64cd..d152390 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -1054,8 +1054,9 @@ public class TXState implements TXStateInterface {
         Assert.assertTrue(this.locks != null,
             "Gemfire Transaction afterCompletion called with illegal state.");
         try {
-          this.proxy.getTxMgr().setTXState(null);
+          proxy.getTxMgr().setTXState(null);
           commit();
+          saveTXCommitMessageForClientFailover();
         } catch (CommitConflictException error) {
           Assert.assertTrue(false, "Gemfire Transaction " + getTransactionId()
               + " afterCompletion failed.due to CommitConflictException: " + error);
@@ -1069,6 +1070,7 @@ public class TXState implements TXStateInterface {
         this.jtaLifeTime = opStart - getBeginTime();
         this.proxy.getTxMgr().setTXState(null);
         rollback();
+        saveTXCommitMessageForClientFailover();
         this.proxy.getTxMgr().noteRollbackSuccess(opStart, this.jtaLifeTime, this);
         break;
       default:
@@ -1077,6 +1079,9 @@ public class TXState implements TXStateInterface {
     // System.err.println("end afterCompletion");
   }
 
+  private void saveTXCommitMessageForClientFailover() {
+    proxy.getTxMgr().saveTXStateForClientFailover(proxy);
+  }
 
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index bcdc632..2ab92e4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -56,9 +56,8 @@ public class CommitCommand extends BaseCommand {
         (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
     int uniqId = clientMessage.getTransactionId();
     TXId txId = new TXId(client, uniqId);
-    TXCommitMessage commitMsg = null;
-    if (txMgr.isHostedTxRecentlyCompleted(txId)) {
-      commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    TXCommitMessage commitMsg = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMsg != null) {
       if (logger.isDebugEnabled()) {
         logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
       }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index 41a50f9..fd9c17f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@ -25,6 +25,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXSynchronizationRunnable;
@@ -72,7 +73,7 @@ public class TXSynchronizationCommand extends BaseCommand {
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start)
       throws IOException, ClassNotFoundException, InterruptedException {
-
+    final boolean isDebugEnabled = logger.isDebugEnabled();
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
 
     CompletionType type = CompletionType.values()[clientMessage.getPart(0).getInt()];
@@ -93,11 +94,33 @@ public class TXSynchronizationCommand extends BaseCommand {
     // get the tx state without associating it with this thread. That's done later
     final TXStateProxy txProxy = txMgr.masqueradeAs(clientMessage, member, true);
 
+    final TXId txId = txProxy.getTxId();
+    TXCommitMessage commitMessage = txMgr.getRecentlyCompletedMessage(txId);
+    if (commitMessage != null && commitMessage != TXCommitMessage.ROLLBACK_MSG) {
+      assert type == CompletionType.AFTER_COMPLETION;
+      try {
+        CommitCommand.writeCommitResponse(commitMessage, clientMessage, serverConnection);
+      } catch (IOException e) {
+        if (isDebugEnabled) {
+          logger.debug("Problem writing reply to client", e);
+        }
+      } catch (RuntimeException e) {
+        try {
+          writeException(clientMessage, e, false, serverConnection);
+        } catch (IOException ioe) {
+          if (isDebugEnabled) {
+            logger.debug("Problem writing reply to client", ioe);
+          }
+        }
+      }
+      serverConnection.setAsTrue(RESPONDED);
+      return;
+    }
+
     // we have to run beforeCompletion and afterCompletion in the same thread
     // because beforeCompletion obtains locks for the thread and afterCompletion
     // releases them
     if (txProxy != null) {
-      final boolean isDebugEnabled = logger.isDebugEnabled();
       try {
         if (type == CompletionType.BEFORE_COMPLETION) {
           Runnable beforeCompletion = new Runnable() {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
new file mode 100644
index 0000000..e6df7ef
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaAfterCompletionMessageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaAfterCompletionMessageTest {
+  @Test
+  public void testAfterCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.getRecentlyCompletedMessage(txId)).thenReturn(mock(TXCommitMessage.class));
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaAfterCompletionMessage message = new JtaAfterCompletionMessage();
+    JtaAfterCompletionMessage spyMessage = spy(message);
+    when(spyMessage.getSender()).thenReturn(mock(InternalDistributedMember.class));
+
+    spyMessage.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
new file mode 100644
index 0000000..be4744c
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/JtaBeforeCompletionMessageTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class JtaBeforeCompletionMessageTest {
+  @Test
+  public void testBeforeCompletionNotInvokedIfJTACompleted() throws Exception {
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txMgr = mock(TXManagerImpl.class);
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    TXId txId = mock(TXId.class);
+
+    when(distributionManager.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txMgr);
+    when(txMgr.isHostedTxRecentlyCompleted(txId)).thenReturn(true);
+    when(txMgr.getTXState()).thenReturn(mock(TXStateProxyImpl.class));
+
+    JtaBeforeCompletionMessage message = new JtaBeforeCompletionMessage(1,
+        mock(InternalDistributedMember.class), mock(ReplyProcessor21.class));
+
+    message.operateOnTx(txId, distributionManager);
+    verify(txMgr, never()).getTXState();
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
index 835fb7e..d739010 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/jta/ClientServerJTADUnitTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import javax.transaction.Status;
+import javax.transaction.TransactionManager;
 
 import org.awaitility.Awaitility;
 import org.junit.Test;
@@ -36,7 +37,10 @@ import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.TXCommitMessage;
+import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXStateProxy;
 import org.apache.geode.internal.cache.TXStateProxyImpl;
 import org.apache.geode.internal.cache.tx.ClientTXStateStub;
 import org.apache.geode.internal.logging.LogService;
@@ -61,14 +65,7 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     getBlackboard().initBlackboard();
     final Properties properties = getDistributedSystemProperties();
 
-    final int port = server.invoke("create cache", () -> {
-      Cache cache = getCache(properties);
-      CacheServer cacheServer = createCacheServer(cache, 0);
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
-      region.put(key, value);
-
-      return cacheServer.getPort();
-    });
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
 
     client.invoke(() -> createClientRegion(host, port, regionName));
 
@@ -193,4 +190,68 @@ public class ClientServerJTADUnitTest extends JUnit4CacheTestCase {
     }
     assertTrue(region.get(key).equals(newValue));
   }
+
+  @Test
+  public void testClientCompletedJTAIsInFailoverMap() throws Exception {
+    final String regionName = getUniqueName();
+    final Properties properties = getDistributedSystemProperties();
+
+    final int port = server.invoke(() -> createServerRegion(regionName, properties));
+
+    createClientRegion(host, port, regionName);
+
+    Region region = getCache().getRegion(regionName);
+    assertTrue(region.get(key).equals(value));
+
+    TransactionManager JTAManager =
+        (TransactionManager) getCache().getJNDIContext().lookup("java:/TransactionManager");
+    assertNotNull(JTAManager);
+
+    // commit
+    JTAManager.begin();
+    region.put(key, newValue);
+    final TXId committedTXId = getTxId();
+    JTAManager.commit();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsCompleted(properties, committedTXId));
+
+    // rollback
+    JTAManager.begin();
+    region.put(key, "UncommittedValue");
+    final TXId rolledBackTXId = getTxId();
+    JTAManager.rollback();
+    assertTrue(region.get(key).equals(newValue));
+
+    server.invoke(() -> verifyJTAIsRollback(properties, rolledBackTXId));
+  }
+
+  private Integer createServerRegion(String regionName, Properties properties) {
+    Cache cache = getCache(properties);
+    CacheServer cacheServer = createCacheServer(cache, 0);
+    Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+    region.put(key, value);
+
+    return cacheServer.getPort();
+  }
+
+  private TXId getTxId() {
+    TXManagerImpl txManager = (TXManagerImpl) getCache().getCacheTransactionManager();
+    TXStateProxy txStateProxy = txManager.getTXState();
+    return txStateProxy.getTxId();
+  }
+
+  private void verifyJTAIsCompleted(Properties properties, TXId committedTXId) {
+    Cache cache = getCache(properties);
+    assertTrue(((TXManagerImpl) cache.getCacheTransactionManager())
+        .isHostedTxRecentlyCompleted(committedTXId));
+  }
+
+  private void verifyJTAIsRollback(Properties properties, TXId rollbackTXId) {
+    Cache cache = getCache(properties);
+    assertEquals(TXCommitMessage.ROLLBACK_MSG, ((TXManagerImpl) cache.getCacheTransactionManager())
+        .getRecentlyCompletedMessage(rollbackTXId));
+
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
eshu11@apache.org.

Mime
View raw message