geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject [geode] branch feature/GEODE-5401 updated: GEODE-5401: Check if transaction has been failed over before expiring client transactions.
Date Wed, 25 Jul 2018 22:49:59 GMT
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-5401
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-5401 by this push:
     new 0beb056  GEODE-5401: Check if transaction has been failed over before expiring client transactions.
0beb056 is described below

commit 0beb0565e8221926ec43631bbbe91175009dcb11
Author: eshu <eshu@pivotal.io>
AuthorDate: Wed Jul 25 15:45:35 2018 -0700

    GEODE-5401: Check if transaction has been failed over before expiring client transactions.
    
      Add a new message when sending expire client transactions to peers.
---
 ...ntServerTransactionFailoverDistributedTest.java | 338 ++++++++++++++++
 ...overWithMixedVersionServersDistributedTest.java | 443 +++++++++++++++++++++
 .../org/apache/geode/internal/DSFIDFactory.java    |   2 +
 .../geode/internal/DataSerializableFixedID.java    |   1 +
 .../apache/geode/internal/cache/TXManagerImpl.java | 207 +++++++++-
 .../geode/internal/cache/TXStateProxyImpl.java     |  23 ++
 .../cache/tier/sockets/ClientHealthMonitor.java    |  36 +-
 .../geode/internal/cache/TXManagerImplTest.java    | 188 ++++++++-
 .../geode/internal/cache/TXStateProxyImplTest.java |  22 +
 9 files changed, 1203 insertions(+), 57 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
new file mode 100644
index 0000000..1d6f413
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverDistributedTest.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.concurrent.FutureTask;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerTransactionFailoverDistributedTest implements Serializable {
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private VM server1;
+  private VM server2;
+  private VM server3;
+  private VM server4;
+  private int port1;
+  private int port2;
+  private int port3;
+  private int port4;
+
+  private final int numOfOperationsInTransaction = 10;
+  private final int key1 = 1;
+  private final String originalValue = "originalValue";
+
+  @Rule
+  public DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() {
+    server1 = getVM(0);
+    server2 = getVM(1);
+    server3 = getVM(2);
+    server4 = getVM(3);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+  }
+
+  @Test
+  public void clientLongTransactionDoesNotLoseOperationsAfterFailoverDirectlyToTransactionHost()
+      throws Exception {
+    port1 = server1.invoke(() -> createServerRegion(1, false));
+    server1.invoke(() -> doPut(key1, originalValue));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+
+    int numOfOpertions = 5;
+    createClientRegion(true, port2, port1);
+    TransactionId txId = suspendTransaction(numOfOpertions);
+    server2.invoke(() -> {
+      cacheRule.getCache().close();
+    });
+    resumeTransaction(txId, numOfOpertions);
+
+    server1.invoke(() -> verifyTransactionResult(1, numOfOpertions));
+  }
+
+  private void doPut(int key, String value) {
+    cacheRule.getCache().getRegion(regionName).put(key, value);
+  }
+
+  private int createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    factory.setTotalNumBuckets(totalNumBuckets);
+    if (isAccessor) {
+      factory.setLocalMaxMemory(0);
+    }
+    PartitionAttributes partitionAttributes = factory.create();
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(partitionAttributes).create(regionName);
+
+    TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+    txManager.setTransactionTimeToLiveForTest(4);
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientRegion(boolean connectToFirstPort, int... ports) {
+    clientCacheRule.createClientCache();
+
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    PoolImpl pool;
+    try {
+      pool = getPool(connectToFirstPort, ports);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    ClientRegionFactory crf =
+        clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+
+    if (ports.length > 1 && connectToFirstPort) {
+      // first connection to the first port in the list
+      pool.acquireConnection(new ServerLocation(hostName, ports[0]));
+    }
+  }
+
+  private PoolImpl getPool(boolean connectToFirstPort, int... ports) {
+    PoolFactory factory = PoolManager.createFactory();
+    for (int port : ports) {
+      factory.addServer(hostName, port);
+    }
+    factory.setReadTimeout(12000).setSocketBufferSize(1000);
+
+    return (PoolImpl) factory.create(uniqueName);
+  }
+
+  private TransactionId suspendTransaction(int numOfOperations) {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+
+    txManager.begin();
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    int whichTransaction = ((TXId) txStateProxy.getTransactionId()).getUniqId();
+    int key = getKey(whichTransaction, numOfOperations);
+    String value = getValue(key);
+    region.put(key, value);
+
+    return txManager.suspend();
+  }
+
+  private void resumeTransaction(TransactionId txId, int numOfOperations) throws Exception {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.resume(txId);
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    int whichTransaction = ((TXId) txStateProxy.getTransactionId()).getUniqId();
+
+    int initialKey = getKey(whichTransaction, numOfOperations);
+    int key = 0;
+    for (int i = 0; i < numOfOperations; i++) {
+      key = initialKey + i;
+      String value = getValue(key);
+      region.put(key, value);
+      Thread.sleep(1000);
+    }
+    txManager.commit();
+  }
+
+  private void verifyTransactionResult(int numOfTransactions, int numOfOperations) {
+    Region region = cacheRule.getCache().getRegion(regionName);
+    int numOfEntries = numOfOperations * numOfTransactions;
+    for (int i = 1; i <= numOfEntries; i++) {
+      LogService.getLogger().info("region get key {} value {} ", i, region.get(i));
+    }
+    for (int i = 1; i <= numOfEntries; i++) {
+      assertEquals("value" + i, region.get(i));
+    }
+  }
+
+  private ClientProxyMembershipID getClientId() {
+    DistributedMember distributedMember =
+        clientCacheRule.getClientCache().getInternalDistributedSystem().getDistributedMember();
+    return ClientProxyMembershipID.getClientId(distributedMember);
+  }
+
+  private void unregisterClient(ClientProxyMembershipID clientProxyMembershipID) throws Exception {
+    ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
+    clientHealthMonitor.removeAllConnectionsAndUnregisterClient(clientProxyMembershipID,
+        new Exception());
+  }
+
+  private int getKey(int whichTransaction, int numOfOperations) {
+    return numOfOperations * (whichTransaction - 1) + 1;
+  }
+
+  private String getValue(int key) {
+    return "value" + key;
+  }
+
+  @Test
+  public void multipleClientLongTransactionsCanFailoverWithoutLosingOperations() throws Exception {
+    // set up
+    setupClientAndServerForMultipleTransactions();
+
+    int numOfTransactions = 12;
+    Thread[] threads = new Thread[numOfTransactions];
+    FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions];
+    TransactionId[] txIds = new TransactionId[numOfTransactions];
+
+    // suspend transactions
+    suspendTransactions(numOfTransactions, numOfOperationsInTransaction, threads, futureTasks,
+        txIds);
+
+    // unregister client on 2 of the servers
+    ClientProxyMembershipID clientProxyMembershipID = getClientId();
+    server1.invoke(() -> unregisterClient(clientProxyMembershipID));
+    server2.invoke(() -> unregisterClient(clientProxyMembershipID));
+
+    resumeTransactions(numOfTransactions, numOfOperationsInTransaction, threads, txIds);
+    waitForResumeTransactionsToComplete(numOfTransactions, threads);
+
+    server4.invoke(() -> verifyTransactionResult(numOfTransactions, numOfOperationsInTransaction));
+  }
+
+  @Test
+  public void multipleClientLongTransactionsCanFailoverMultipleTimesWithoutLosingOperations()
+      throws Exception {
+    // set up
+    setupClientAndServerForMultipleTransactions();
+
+    int numOfTransactions = 12;
+    int numOfOperations = 12;
+    Thread[] threads = new Thread[numOfTransactions];
+    FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions];
+    TransactionId[] txIds = new TransactionId[numOfTransactions];
+
+    // suspend transactions
+    suspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds);
+
+    // unregister client multiple times
+    ClientProxyMembershipID clientProxyMembershipID = getClientId();
+
+    resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
+    unregisterClientMultipleTimes(clientProxyMembershipID);
+    waitForResumeTransactionsToComplete(numOfTransactions, threads);
+
+    server4.invoke(() -> verifyTransactionResult(numOfTransactions, numOfOperations));
+  }
+
+  private void waitForResumeTransactionsToComplete(int numOfTransactions, Thread[] threads)
+      throws InterruptedException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      threads[i].join();
+    }
+  }
+
+  private void suspendTransactions(int numOfTransactions, int numOfOperations, Thread[] threads,
+      FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      FutureTask<TransactionId> futureTask =
+          new FutureTask<>(() -> suspendTransaction(numOfOperations));
+      futureTasks[i] = futureTask;
+      Thread thread = new Thread(futureTask);
+      threads[i] = thread;
+      thread.start();
+    }
+
+    for (int i = 0; i < numOfTransactions; i++) {
+      txIds[i] = futureTasks[i].get();
+    }
+  }
+
+  private void setupClientAndServerForMultipleTransactions() {
+    port4 = server4.invoke(() -> createServerRegion(1, false));
+    server4.invoke(() -> doPut(key1, originalValue));
+    port1 = server1.invoke(() -> createServerRegion(1, true));
+    port2 = server2.invoke(() -> createServerRegion(1, true));
+    port3 = server3.invoke(() -> createServerRegion(1, true));
+    createClientRegion(false, port1, port2, port3, port4);
+  }
+
+  private void resumeTransactions(int numOfTransactions, int numOfOperations, Thread[] threads,
+      TransactionId[] txIds)
+      throws InterruptedException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      TransactionId txId = txIds[i];
+      Thread thread = new Thread(() -> {
+        try {
+          resumeTransaction(txId, numOfOperations);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+      thread.start();
+      threads[i] = thread;
+    }
+  }
+
+  private void unregisterClientMultipleTimes(ClientProxyMembershipID clientProxyMembershipID)
+      throws Exception {
+    int numOfUnregisterClients = 4;
+    for (int i = 0; i < numOfUnregisterClients; i++) {
+      getVM(i).invoke(() -> unregisterClient(clientProxyMembershipID));
+      Thread.sleep(1000);
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
new file mode 100644
index 0000000..90e378c
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/ClientServerTransactionFailoverWithMixedVersionServersDistributedTest.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Properties;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerTransactionFailoverWithMixedVersionServersDistributedTest
+    implements Serializable {
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private VM server1;
+  private VM server2;
+  private VM server3;
+  private VM server4;
+  private VM locator;
+  private VM client;
+  private int locatorPort;
+  private File locatorLog;
+  private Host host;
+  private final int transactionTimeoutSecond = 2;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule(6);
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setup() throws Exception {
+    host = Host.getHost(0);
+    String startingVersion = "160";
+    server1 = host.getVM(startingVersion, 0);
+    server2 = host.getVM(startingVersion, 1);
+    server3 = host.getVM(startingVersion, 2);
+    server4 = host.getVM(startingVersion, 3);
+    client = host.getVM(startingVersion, 4);
+    locator = host.getVM(startingVersion, 5);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+    locatorLog = new File(temporaryFolder.newFolder(uniqueName), "locator.log");
+  }
+
+  @Test
+  public void clientTransactionOperationsAreNotLostIfTransactionIsOnRolledServer()
+      throws Exception {
+    setupPartiallyRolledVersion();
+
+    server1.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> createServerRegion(1, true));
+    server3.invoke(() -> createServerRegion(1, true));
+    server4.invoke(() -> createServerRegion(1, true));
+    client.invoke(() -> createClientRegion());
+
+    ClientProxyMembershipID clientProxyMembershipID = client.invoke(() -> getClientId());
+
+    int numOfTransactions = 12;
+    int numOfOperations = 12;
+    client.invokeAsync(() -> doTransactions(numOfTransactions, numOfOperations));
+
+    unregisterClientMultipleTimes(clientProxyMembershipID);
+
+    server1.invoke(() -> verifyTransactionResult(numOfTransactions, numOfOperations, false));
+    client.invoke(() -> verifyTransactionResult(numOfTransactions, numOfOperations, true));
+  }
+
+  private void doTransactions(int numOfTransactions, int numOfOperations)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    Thread[] threads = new Thread[numOfTransactions];
+    FutureTask<TransactionId>[] futureTasks = new FutureTask[numOfTransactions];
+    TransactionId[] txIds = new TransactionId[numOfTransactions];
+    // suspend transactions
+    suspendTransactions(numOfTransactions, numOfOperations, threads, futureTasks, txIds);
+    // resume transactions
+    resumeTransactions(numOfTransactions, numOfOperations, threads, txIds);
+    waitForResumeTransactionsToComplete(numOfTransactions, threads);
+  }
+
+  private void setupPartiallyRolledVersion() throws Exception {
+    locatorPort = locator.invoke(() -> startLocator());
+    server1.invoke(() -> createCacheServer());
+    server2.invoke(() -> createCacheServer());
+    server3.invoke(() -> createCacheServer());
+    server4.invoke(() -> createCacheServer());
+    client.invoke(() -> createClientCache());
+
+    // roll locator
+    locator = rollLocatorToCurrent(locator);
+    // roll server1
+    server1 = rollServerToCurrent(server1);
+    server2 = rollServerToCurrent(server2);
+  }
+
+  private int startLocator() throws IOException {
+    Properties config = createLocatorConfig();
+    InetAddress bindAddress = InetAddress.getByName(hostName);
+    Locator locator = Locator.startLocatorAndDS(locatorPort, locatorLog, bindAddress, config);
+    return locator.getPort();
+  }
+
+  private Properties createLocatorConfig() {
+    Properties config = new Properties();
+    config.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
+    config.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, "false");
+    config.setProperty(USE_CLUSTER_CONFIGURATION, "false");
+    return config;
+  }
+
+  private void createCacheServer() throws Exception {
+    cacheRule.createCache(createServerConfig());
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+  }
+
+  private Properties createServerConfig() {
+    Properties config = createLocatorConfig();
+    config.setProperty(LOCATORS, hostName + "[" + locatorPort + "]");
+    return config;
+  }
+
+  private void createClientCache() {
+    clientCacheRule.createClientCache();
+  }
+
+  private VM rollLocatorToCurrent(VM oldLocator) throws Exception {
+    // Roll the locator
+    oldLocator.invoke(() -> stopLocator());
+    VM rollLocator = host.getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
+    rollLocator.invoke(() -> startLocator());
+    return rollLocator;
+  }
+
+  private void stopLocator() {
+    Locator.getLocator().stop();
+  }
+
+  private VM rollServerToCurrent(VM oldServer) throws Exception {
+    // Roll the server
+    oldServer.invoke(() -> cacheRule.getCache().close());
+    VM rollServer = host.getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
+    rollServer.invoke(() -> createCacheServer());
+    return rollServer;
+  }
+
+  private void createServerRegion(int totalNumBuckets, boolean isAccessor) throws Exception {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    factory.setTotalNumBuckets(totalNumBuckets);
+    if (isAccessor) {
+      factory.setLocalMaxMemory(0);
+    }
+    PartitionAttributes partitionAttributes = factory.create();
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(partitionAttributes).create(regionName);
+
+    TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+    txManager.setTransactionTimeToLiveForTest(transactionTimeoutSecond);
+  }
+
+  private void createClientRegion() throws Exception {
+    Pool pool = PoolManager.createFactory().addLocator(hostName, locatorPort).create(uniqueName);
+
+    ClientRegionFactory crf =
+        clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+  }
+
+  private void suspendTransactions(int numOfTransactions, int numOfOperations, Thread[] threads,
+      FutureTask<TransactionId>[] futureTasks, TransactionId[] txIds)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      FutureTask<TransactionId> futureTask =
+          new FutureTask<>(() -> suspendTransaction(numOfOperations));
+      futureTasks[i] = futureTask;
+      Thread thread = new Thread(futureTask);
+      threads[i] = thread;
+      thread.start();
+    }
+
+    for (int i = 0; i < numOfTransactions; i++) {
+      txIds[i] = futureTasks[i].get();
+    }
+  }
+
+  private TransactionId suspendTransaction(int numOfOperations) {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    startTransaction(numOfOperations, region, txManager);
+
+    return txManager.suspend();
+  }
+
+  private void startTransaction(int numOfOperations, Region region, TXManagerImpl txManager) {
+    txManager.begin();
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    int whichTransaction = ((TXId) txStateProxy.getTransactionId()).getUniqId();
+    int key = getKey(whichTransaction, numOfOperations);
+    String value = getValue(key);
+    region.put(key, value);
+  }
+
+  private int getKey(int whichTransaction, int numOfOperations) {
+    return numOfOperations * (whichTransaction - 1) + 1;
+  }
+
+  private String getValue(int key) {
+    return "value" + key;
+  }
+
+  private ClientProxyMembershipID getClientId() {
+    DistributedMember distributedMember =
+        clientCacheRule.getClientCache().getInternalDistributedSystem().getDistributedMember();
+    return ClientProxyMembershipID.getClientId(distributedMember);
+  }
+
+  private void resumeTransactions(int numOfTransactions, int numOfOperations, Thread[] threads,
+      TransactionId[] txIds)
+      throws InterruptedException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      TransactionId txId = txIds[i];
+      Thread thread = new Thread(() -> {
+        try {
+          resumeTransaction(txId, numOfOperations);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+      thread.start();
+      threads[i] = thread;
+    }
+  }
+
+  private void resumeTransaction(TransactionId txId, int numOfOperations) throws Exception {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.resume(txId);
+    TXStateProxyImpl txStateProxy = (TXStateProxyImpl) txManager.getTXState();
+    int whichTransaction = ((TXId) txStateProxy.getTransactionId()).getUniqId();
+
+    int initialKey = getKey(whichTransaction, numOfOperations);
+    int key = 0;
+    for (int i = 0; i < numOfOperations; i++) {
+      key = initialKey + i;
+      String value = getValue(key);
+      region.put(key, value);
+      Thread.sleep(1000);
+    }
+    txManager.commit();
+  }
+
+  private void unregisterClientMultipleTimes(ClientProxyMembershipID clientProxyMembershipID)
+      throws Exception {
+    int numOfUnregisterClients = 4;
+    for (int i = 0; i < numOfUnregisterClients; i++) {
+      getVM(i).invoke(() -> unregisterClient(clientProxyMembershipID));
+      Thread.sleep(1000);
+    }
+  }
+
+  private void unregisterClient(ClientProxyMembershipID clientProxyMembershipID) throws Exception {
+    ClientHealthMonitor clientHealthMonitor = ClientHealthMonitor.getInstance();
+    clientHealthMonitor.removeAllConnectionsAndUnregisterClient(clientProxyMembershipID,
+        new Exception());
+  }
+
+  private void waitForResumeTransactionsToComplete(int numOfTransactions, Thread[] threads)
+      throws InterruptedException {
+    for (int i = 0; i < numOfTransactions; i++) {
+      threads[i].join();
+    }
+  }
+
+  private void verifyTransactionResult(int numOfTransactions, int numOfOperations,
+      boolean isClient) {
+    Region region;
+    if (isClient) {
+      region = clientCacheRule.getClientCache().getRegion(regionName);
+    } else {
+      region = cacheRule.getCache().getRegion(regionName);
+    }
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> assertThat(region.get(1)).isEqualTo("value1"));
+    int numOfEntries = numOfOperations * numOfTransactions;
+    for (int i = 1; i <= numOfEntries; i++) {
+      LogService.getLogger().info("region get key {} value {} ", i, region.get(i));
+    }
+    for (int i = 1; i <= numOfEntries; i++) {
+      assertEquals("value" + i, region.get(i));
+    }
+  }
+
+  @Test
+  public void clientTransactionExpiredAreRemovedOnRolledServer() throws Exception {
+    setupPartiallyRolledVersion();
+
+    server1.invoke(() -> createServerRegion(1, false));
+    server2.invoke(() -> createServerRegion(1, true));
+    server3.invoke(() -> createServerRegion(1, true));
+    server4.invoke(() -> createServerRegion(1, true));
+    client.invoke(() -> createClientRegion());
+
+    ClientProxyMembershipID clientProxyMembershipID = client.invoke(() -> getClientId());
+
+    int numOfTransactions = 12;
+    int numOfOperations = 1;
+    client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions, numOfOperations));
+
+    server1.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
+
+    unregisterClientMultipleTimes(clientProxyMembershipID);
+
+    server1.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
+  }
+
+  private void doUnfinishedTransactions(int numOfTransactions, int numOfOperations)
+      throws InterruptedException, java.util.concurrent.ExecutionException {
+    Thread[] threads = new Thread[numOfTransactions];
+    for (int i = 0; i < numOfTransactions; i++) {
+      Thread thread = new Thread(() -> startTransaction(numOfOperations));
+      threads[i] = thread;
+      thread.start();
+    }
+
+    for (int i = 0; i < numOfTransactions; i++) {
+      threads[i].join();
+    }
+  }
+
+  private void startTransaction(int numOfOperations) {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    TXManagerImpl txManager =
+        (TXManagerImpl) clientCacheRule.getClientCache().getCacheTransactionManager();
+    startTransaction(numOfOperations, region, txManager);
+  }
+
+  private void verifyTransactionAreStarted(int numOfTransactions) {
+    TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> assertThat(txManager.hostedTransactionsInProgressForTest())
+            .isEqualTo(numOfTransactions));
+  }
+
+  private void verifyTransactionAreExpired(int numOfTransactions) {
+    TXManagerImpl txManager = cacheRule.getCache().getTxManager();
+    Awaitility.await().atMost(60, TimeUnit.SECONDS)
+        .until(() -> assertThat(txManager.hostedTransactionsInProgressForTest()).isEqualTo(0));
+  }
+
+  @Test
+  public void clientTransactionExpiredAreRemovedOnNotYetRolledServer() throws Exception {
+    setupPartiallyRolledVersion();
+
+    server1.invoke(() -> createServerRegion(1, true));
+    server2.invoke(() -> createServerRegion(1, true));
+    server3.invoke(() -> createServerRegion(1, true));
+    server4.invoke(() -> createServerRegion(1, false));
+    client.invoke(() -> createClientRegion());
+
+    ClientProxyMembershipID clientProxyMembershipID = client.invoke(() -> getClientId());
+
+    int numOfTransactions = 12;
+    int numOfOperations = 1;
+    client.invokeAsync(() -> doUnfinishedTransactions(numOfTransactions, numOfOperations));
+
+    server4.invoke(() -> verifyTransactionAreStarted(numOfTransactions));
+
+    unregisterClientMultipleTimes(clientProxyMembershipID);
+
+    server4.invoke(() -> verifyTransactionAreExpired(numOfTransactions));
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 204459f..a6ee477 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -937,6 +937,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
     registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+    registerDSFID(EXPIRE_CLIENT_TRANSACTIONS,
+        TXManagerImpl.ExpireDisconnectedClientTransactionsMessage.class);
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index ecf20ab..c2a40d3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -823,6 +823,7 @@ public interface DataSerializableFixedID extends SerializationVersions {
   short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
   short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
   short ABORT_BACKUP_REQUEST = 2183;
+  short EXPIRE_CLIENT_TRANSACTIONS = 2184;
 
   // NOTE, codes > 65535 will take 4 bytes to serialize
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 639656f..18cf5ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -60,6 +60,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.MembershipListener;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.SystemTimer.SystemTimerTask;
+import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
@@ -119,6 +120,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
 
   private final Map<TXId, TXStateProxy> hostedTXStates;
 
+  private final Set<TXId> scheduledToBeRemovedTx =
+      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx")
+          ? new ConcurrentHashSet<TXId>() : null;
+
   /**
    * the number of client initiated transactions to store for client failover
    */
@@ -997,8 +1002,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     return val;
   }
 
-
-
   /**
    * Associate the transactional state with this thread.
    *
@@ -1017,6 +1020,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
    */
   public void unmasquerade(TXStateProxy tx) {
     if (tx != null) {
+      if (tx.isOnBehalfOfClient()) {
+        updateLastOperationTime(tx);
+      }
       cleanupTransactionIfNoLongerHost(tx);
       setTXState(null);
       tx.getLock().unlock();
@@ -1035,8 +1041,12 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     }
   }
 
+  void updateLastOperationTime(TXStateProxy tx) {
+    ((TXStateProxyImpl) tx).setLastOperationTimeFromClient(System.currentTimeMillis());
+  }
+
   /**
-   * Cleanup the remote txState after commit and rollback
+   * Cleanup the txState
    *
    * @return the TXStateProxy
    */
@@ -1050,6 +1060,12 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     }
   }
 
+  public void removeHostedTXState(Set<TXId> txIds) {
+    for (TXId txId : txIds) {
+      removeHostedTXState(txId);
+    }
+  }
+
   /**
    * Called when the CacheServer is shutdown. Removes txStates hosted on client's behalf
    */
@@ -1167,12 +1183,27 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     return result;
   }
 
-  /** remove the given TXStates */
-  public void removeTransactions(Set<TXId> txIds, boolean distribute) {
+  /**
+   * This method is only being invoked by pre geode 1.7.0 server during rolling upgrade now.
+   * The remote server has waited for transactionTimeToLive and require this server to
+   * remove the client transactions. Need to check if there is no activity of the client
+   * transaction.
+   */
+  public void removeExpiredClientTransactions(Set<TXId> txIds) {
     if (logger.isDebugEnabled()) {
       logger.debug("expiring the following transactions: {}", txIds);
     }
     synchronized (this.hostedTXStates) {
+      for (TXId txId : txIds) {
+        // only expire client transaction if no activity for the given transactionTimeToLive
+        scheduleToRemoveExpiredClientTransction(txId);
+      }
+    }
+  }
+
+  /** remove the given TXStates for test */
+  public void removeTransactions(Set<TXId> txIds, boolean distribute) {
+    synchronized (this.hostedTXStates) {
       Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
       while (iterator.hasNext()) {
         Map.Entry<TXId, TXStateProxy> entry = iterator.next();
@@ -1182,10 +1213,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
         }
       }
     }
-    if (distribute) {
-      // tell other VMs to also remove the transactions
-      TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), txIds);
-    }
   }
 
   void saveTXStateForClientFailover(TXStateProxy tx) {
@@ -1314,10 +1341,19 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     /** for deserialization */
     public TXRemovalMessage() {}
 
-    static void send(DistributionManager dm, Set recipients, Set<TXId> txIds) {
+    static void send(DistributionManager dm, Set<InternalDistributedMember> recipients,
+        Set<TXId> txIds) {
       TXRemovalMessage msg = new TXRemovalMessage();
       msg.txIds = txIds;
-      msg.setRecipients(recipients);
+      // only send to servers with version earlier than geode 1.7.0
+      // newer version use ExpireDisconnectedClientTransactionsMessage
+      Set oldVersionRecipients = new HashSet();
+      for (InternalDistributedMember recipient : recipients) {
+        if (recipient.getVersionObject().compareTo(Version.GEODE_170) < 0) {
+          oldVersionRecipients.add(recipient);
+        }
+      }
+      msg.setRecipients(oldVersionRecipients);
       dm.putOutgoing(msg);
     }
 
@@ -1340,7 +1376,148 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
       InternalCache cache = dm.getCache();
       if (cache != null) {
         TXManagerImpl mgr = cache.getTXMgr();
-        mgr.removeTransactions(this.txIds, false);
+        // check if transaction has been updated before remove it
+        mgr.removeExpiredClientTransactions(this.txIds);
+      }
+    }
+  }
+
+  public static class ExpireDisconnectedClientTransactionsMessage
+      extends HighPriorityDistributionMessage {
+    Set<TXId> txIds;
+
+    /** for deserialization */
+    public ExpireDisconnectedClientTransactionsMessage() {}
+
+    // only send to geode 1.7.0 and later servers
+    static void send(DistributionManager dm, Set<InternalDistributedMember> recipients,
+        Set<TXId> txIds) {
+      ExpireDisconnectedClientTransactionsMessage msg =
+          new ExpireDisconnectedClientTransactionsMessage();
+      msg.txIds = txIds;
+      Set newVersionRecipients = new HashSet();
+      for (InternalDistributedMember recipient : recipients) {
+        // to geode 1.7.0 and later version servers
+        if (recipient.getVersionObject().compareTo(Version.GEODE_170) >= 0) {
+          newVersionRecipients.add(recipient);
+        }
+      }
+      msg.setRecipients(newVersionRecipients);
+      dm.putOutgoing(msg);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeHashSet((HashSet<TXId>) this.txIds, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      this.txIds = DataSerializer.readHashSet(in);
+    }
+
+    public int getDSFID() {
+      return EXPIRE_CLIENT_TRANSACTIONS;
+    }
+
+    @Override
+    protected void process(ClusterDistributionManager dm) {
+      InternalCache cache = dm.getCache();
+      if (cache != null) {
+        TXManagerImpl mgr = cache.getTXMgr();
+        mgr.expireDisconnectedClientTransactions(this.txIds, false);
+      }
+    }
+  }
+
+  /** timer task for expiring the given TXStates */
+  public void expireDisconnectedClientTransactions(Set<TXId> txIds, boolean distribute) {
+    long timeout = TimeUnit.SECONDS.toMillis(getTransactionTimeToLive());
+    if (distribute) {
+      if (timeout <= 0) {
+        removeClientTransactionsOnRemoteServer(txIds);
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger.debug("expiring the following transactions: {}", Arrays.toString(txIds.toArray()));
+        }
+        // schedule to send remove message to server with version earlier than geode 1.7.0
+        SystemTimerTask task = new SystemTimerTask() {
+          @Override
+          public void run2() {
+            removeClientTransactionsOnRemoteServer(txIds);
+          }
+        };
+        getCache().getCCPTimer().schedule(task, timeout);
+      }
+    }
+    // schedule to expire client transactions on server with version geode 1.7.0 and after.
+    scheduleToExpireDisconnectedClientTransactions(txIds, distribute);
+  }
+
+  void removeClientTransactionsOnRemoteServer(Set<TXId> txIds) {
+    TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), txIds);
+  }
+
+  /** timer task for expiring the given TXStates */
+  public void scheduleToExpireDisconnectedClientTransactions(Set<TXId> txIds, boolean distribute) {
+    // increase the client transaction timeout setting to avoid a late in-flight client operation
+    // preventing the expiration of the client transaction.
+    long timeout = (long) (TimeUnit.SECONDS.toMillis(getTransactionTimeToLive()) * 1.1);
+    if (timeout <= 0) {
+      removeHostedTXState(txIds);
+    }
+    synchronized (this.hostedTXStates) {
+      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<TXId, TXStateProxy> entry = iterator.next();
+        if (txIds.contains(entry.getKey())) {
+          scheduleToRemoveClientTransaction(entry.getKey(), timeout);
+        }
+      }
+    }
+    if (distribute) {
+      expireClientTransactionsOnRemoteServer(txIds);
+    }
+  }
+
+  void expireClientTransactionsOnRemoteServer(Set<TXId> txIds) {
+    // tell other VMs to also add tasks to expire the transactions
+    ExpireDisconnectedClientTransactionsMessage.send(this.dm,
+        this.dm.getOtherDistributionManagerIds(), txIds);
+  }
+
+  /**
+   * expire the transaction states for the given client.
+   * If the timeout is non-positive we expire the states immediately
+   */
+  void scheduleToRemoveClientTransaction(TXId txId, long timeout) {
+    if (timeout <= 0) {
+      removeHostedTXState(txId);
+    } else {
+      if (scheduledToBeRemovedTx != null) {
+        scheduledToBeRemovedTx.add(txId);
+      }
+      SystemTimerTask task = new SystemTimerTask() {
+        @Override
+        public void run2() {
+          scheduleToRemoveExpiredClientTransction(txId);
+          if (scheduledToBeRemovedTx != null) {
+            scheduledToBeRemovedTx.remove(txId);
+          }
+        }
+      };
+      getCache().getCCPTimer().schedule(task, timeout);
+    }
+  }
+
+  void scheduleToRemoveExpiredClientTransction(TXId txId) {
+    synchronized (this.hostedTXStates) {
+      TXStateProxy result = hostedTXStates.get(txId);
+      if (result != null) {
+        if (((TXStateProxyImpl) result).isOverTransactionTimeoutLimit()) {
+          result.close();
+          hostedTXStates.remove(txId);
+        }
       }
     }
   }
@@ -1799,7 +1976,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
       return;
     }
     if (logger.isDebugEnabled()) {
-      logger.debug("expiring the following transactions: {}", txIds);
+      logger.debug("expiring the following transactions: {}", Arrays.toString(txIds.toArray()));
     }
     synchronized (this.hostedTXStates) {
       Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
@@ -1879,4 +2056,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
     return hostedTXStates.isEmpty();
   }
 
+  public Set<TXId> getScheduledToBeRemovedTx() {
+    return scheduledToBeRemovedTx;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 00f15c3..dc82fce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -19,6 +19,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -75,6 +76,7 @@ public class TXStateProxyImpl implements TXStateProxy {
   protected InternalDistributedMember onBehalfOfClientMember = null;
 
   private final InternalCache cache;
+  private long lastOperationTimeFromClient;
 
   public TXStateProxyImpl(InternalCache cache, TXManagerImpl managerImpl, TXId id,
       InternalDistributedMember clientMember) {
@@ -956,4 +958,25 @@ public class TXStateProxyImpl implements TXStateProxy {
       ((TXState) this.realDeal).setProxyServer(proxy);
     }
   }
+
+  public boolean isOverTransactionTimeoutLimit() {
+    if (getCurrentTime() - getLastOperationTimeFromClient() > TimeUnit.SECONDS
+        .toMillis(txMgr.getTransactionTimeToLive())) {
+      return true;
+    }
+    return false;
+  }
+
+  long getCurrentTime() {
+    return System.currentTimeMillis();
+  }
+
+  synchronized long getLastOperationTimeFromClient() {
+    return lastOperationTimeFromClient;
+  }
+
+  public synchronized void setLastOperationTimeFromClient(long lastOperationTimeFromClient) {
+    this.lastOperationTimeFromClient = lastOperationTimeFromClient;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 72f4f6f..934d501 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -30,9 +30,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.SystemTimer.SystemTimerTask;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheClientStatus;
 import org.apache.geode.internal.cache.IncomingGatewayStatus;
@@ -40,7 +38,6 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.TXId;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
-import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -282,15 +279,12 @@ public class ClientHealthMonitor {
     }
   }
 
-  private final Set<TXId> scheduledToBeRemovedTx =
-      Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx")
-          ? new ConcurrentHashSet<TXId>() : null;
-
   /**
    * provide a test hook to track client transactions to be removed
    */
   public Set<TXId> getScheduledToBeRemovedTx() {
-    return scheduledToBeRemovedTx;
+    final TXManagerImpl txMgr = (TXManagerImpl) this._cache.getCacheTransactionManager();
+    return txMgr.getScheduledToBeRemovedTx();
   }
 
   /**
@@ -301,33 +295,13 @@ public class ClientHealthMonitor {
    */
   private void expireTXStates(ClientProxyMembershipID proxyID) {
     final TXManagerImpl txMgr = (TXManagerImpl) this._cache.getCacheTransactionManager();
-    final Set<TXId> txids =
+    final Set<TXId> txIds =
         txMgr.getTransactionsForClient((InternalDistributedMember) proxyID.getDistributedMember());
     if (this._cache.isClosed()) {
       return;
     }
-    long timeout = txMgr.getTransactionTimeToLive() * 1000;
-    if (!txids.isEmpty()) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("expiring {} transaction contexts for {} timeout={}", txids.size(), proxyID,
-            timeout / 1000);
-      }
-
-      if (timeout <= 0) {
-        txMgr.removeTransactions(txids, true);
-      } else {
-        if (scheduledToBeRemovedTx != null)
-          scheduledToBeRemovedTx.addAll(txids);
-        SystemTimerTask task = new SystemTimerTask() {
-          @Override
-          public void run2() {
-            txMgr.removeTransactions(txids, true);
-            if (scheduledToBeRemovedTx != null)
-              scheduledToBeRemovedTx.removeAll(txids);
-          }
-        };
-        this._cache.getCCPTimer().schedule(task, timeout);
-      }
+    if (!txIds.isEmpty()) {
+      txMgr.expireDisconnectedClientTransactions(txIds, true);
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
index 91ba412..cf42bab 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXManagerImplTest.java
@@ -21,39 +21,58 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.awaitility.Awaitility;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
 import org.apache.geode.test.fake.Fakes;
 
 
 public class TXManagerImplTest {
   private TXManagerImpl txMgr;
-  TXId txid;
-  DestroyMessage msg;
-  TXCommitMessage txCommitMsg;
-  TXId completedTxid;
-  TXId notCompletedTxid;
-  InternalDistributedMember member;
-  CountDownLatch latch;
-  TXStateProxy tx1, tx2;
-  ClusterDistributionManager dm;
-  TXRemoteRollbackMessage rollbackMsg;
-  TXRemoteCommitMessage commitMsg;
+  private TXId txid;
+  private DestroyMessage msg;
+  private TXCommitMessage txCommitMsg;
+  private TXId completedTxid;
+  private TXId notCompletedTxid;
+  private InternalDistributedMember member;
+  private CountDownLatch latch;
+  private TXStateProxy tx1, tx2;
+  private ClusterDistributionManager dm;
+  private TXRemoteRollbackMessage rollbackMsg;
+  private TXRemoteCommitMessage commitMsg;
+  private InternalCache cache;
+  private TXManagerImpl spyTxMgr;
+  private InternalCache spyCache;
+  private SystemTimer timer;
 
   @Before
   public void setUp() {
-    InternalCache cache = Fakes.cache();
+    cache = Fakes.cache();
     dm = mock(ClusterDistributionManager.class);
     txMgr = new TXManagerImpl(mock(CachePerfStats.class), cache);
     txid = new TXId(null, 0);
@@ -69,6 +88,13 @@ public class TXManagerImplTest {
     when(this.msg.canStartRemoteTransaction()).thenReturn(true);
     when(this.msg.canParticipateInTransaction()).thenReturn(true);
 
+    spyCache = spy(Fakes.cache());
+    InternalDistributedSystem distributedSystem = mock(InternalDistributedSystem.class);
+    doReturn(distributedSystem).when(spyCache).getDistributedSystem();
+    when(distributedSystem.getDistributionManager()).thenReturn(dm);
+    spyTxMgr = spy(new TXManagerImpl(mock(CachePerfStats.class), spyCache));
+    timer = mock(SystemTimer.class);
+    doReturn(timer).when(spyCache).getCCPTimer();
   }
 
   @Test
@@ -349,7 +375,7 @@ public class TXManagerImplTest {
   }
 
   @Test
-  public void txStateCleanedupIfRemovedFromHostedTxStatesMap() {
+  public void txStateCleanedUpIfRemovedFromHostedTxStatesMap() {
     tx1 = txMgr.getOrSetHostedTXState(txid, msg);
     TXStateProxyImpl txStateProxy = (TXStateProxyImpl) tx1;
     assertNotNull(txStateProxy);
@@ -361,4 +387,140 @@ public class TXManagerImplTest {
     txMgr.unmasquerade(tx1);
     assertTrue(txStateProxy.getLocalRealDeal().isClosed());
   }
+
+  @Test
+  public void clientTransactionWithIdleTimeLongerThanTransactionTimeoutIsRemoved()
+      throws Exception {
+    when(msg.getTXOriginatorClient()).thenReturn(mock(InternalDistributedMember.class));
+    TXStateProxyImpl tx = spy((TXStateProxyImpl) txMgr.getOrSetHostedTXState(txid, msg));
+    doReturn(true).when(tx).isOverTransactionTimeoutLimit();
+
+    txMgr.scheduleToRemoveExpiredClientTransction(txid);
+
+    assertTrue(txMgr.isHostedTXStatesEmpty());
+  }
+
+  @Test
+  public void processExpireDisconnectedClientTransactionsMessageWillExpireDisconnectedClientTransactions() {
+    TXManagerImpl.ExpireDisconnectedClientTransactionsMessage message =
+        new TXManagerImpl.ExpireDisconnectedClientTransactionsMessage();
+
+    InternalCache cache = mock(InternalCache.class);
+    TXManagerImpl txManager = mock(TXManagerImpl.class);
+    when(dm.getCache()).thenReturn(cache);
+    when(cache.getTXMgr()).thenReturn(txManager);
+
+    message.process(dm);
+
+    verify(txManager, times(1)).expireDisconnectedClientTransactions(any(), eq(false));
+  }
+
+  @Test
+  public void clientTransactionsToBeRemovedAndDistributedAreSentToRemoveServerIfWithNoTimeout() {
+    Set<TXId> txIds = (Set<TXId>) mock(Set.class);
+    doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
+    when(txIds.iterator()).thenAnswer(new Answer<Iterator<TXId>>() {
+      @Override
+      public Iterator<TXId> answer(InvocationOnMock invocation) throws Throwable {
+        return Arrays.asList(txid, mock(TXId.class)).iterator();
+      }
+    });
+
+    spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
+
+    verify(spyTxMgr, times(1)).removeClientTransactionsOnRemoteServer(eq(txIds));
+    verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+  }
+
+  @Test
+  public void clientTransactionsToBeExpiredAreRemovedAndNotDistributedIfWithNoTimeout() {
+    doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+    TXId txId1 = mock(TXId.class);
+    TXId txId2 = mock(TXId.class);
+    TXId txId3 = mock(TXId.class);
+    tx1 = spyTxMgr.getOrSetHostedTXState(txId1, msg);
+    tx2 = spyTxMgr.getOrSetHostedTXState(txId2, msg);
+    Set<TXId> txIds = spy(new HashSet<>());
+    txIds.add(txId1);
+    doReturn(0).when(spyTxMgr).getTransactionTimeToLive();
+    when(txIds.iterator()).thenAnswer(new Answer<Iterator<TXId>>() {
+      @Override
+      public Iterator<TXId> answer(InvocationOnMock invocation) throws Throwable {
+        return Arrays.asList(txId1, txId3).iterator();
+      }
+    });
+    assertEquals(2, spyTxMgr.getHostedTXStates().size());
+
+    spyTxMgr.expireDisconnectedClientTransactions(txIds, false);
+
+    verify(spyTxMgr, never()).removeClientTransactionsOnRemoteServer(eq(txIds));
+    verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(false));
+    verify(spyTxMgr, times(1)).removeHostedTXState(eq(txIds));
+    verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId1));
+    verify(spyTxMgr, times(1)).removeHostedTXState(eq(txId3));
+    assertEquals(tx2, spyTxMgr.getHostedTXStates().get(txId2));
+    assertEquals(1, spyTxMgr.getHostedTXStates().size());
+  }
+
+  @Test
+  public void clientTransactionsToBeRemovedAndDistributedAreScheduledToSentToRemoveServerIfWithTimeout() {
+    Set<TXId> txIds = mock(Set.class);
+    doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+
+    spyTxMgr.expireDisconnectedClientTransactions(txIds, true);
+
+    verify(timer, times(1)).schedule(any(), eq(1000L));
+    verify(spyTxMgr, times(1)).scheduleToExpireDisconnectedClientTransactions(eq(txIds), eq(true));
+  }
+
+  @Test
+  public void clientTransactionsToBeExpiredAndDistributedAreSentToRemoveServer() {
+    Set<TXId> txIds = mock(Set.class);
+
+    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, true);
+
+    verify(spyTxMgr, times(1)).expireClientTransactionsOnRemoteServer(eq(txIds));
+  }
+
+  @Test
+  public void clientTransactionsNotToBeDistributedAreNotSentToRemoveServer() {
+    Set<TXId> txIds = mock(Set.class);
+
+    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(txIds, false);
+
+    verify(spyTxMgr, never()).expireClientTransactionsOnRemoteServer(eq(txIds));
+  }
+
+  @Test
+  public void clientTransactionsToBeExpiredIsScheduledToBeRemoved() {
+    doReturn(1).when(spyTxMgr).getTransactionTimeToLive();
+    TXId txId1 = mock(TXId.class);
+    TXId txId2 = mock(TXId.class);
+    TXId txId3 = mock(TXId.class);
+    tx1 = spyTxMgr.getOrSetHostedTXState(txId1, msg);
+    tx2 = spyTxMgr.getOrSetHostedTXState(txId2, msg);
+    Set<TXId> set = new HashSet<>();
+    set.add(txId1);
+    set.add(txId2);
+
+    spyTxMgr.scheduleToExpireDisconnectedClientTransactions(set, false);
+
+    verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId1), eq(1100L));
+    verify(spyTxMgr, times(1)).scheduleToRemoveClientTransaction(eq(txId2), eq(1100L));
+    verify(spyTxMgr, never()).scheduleToRemoveClientTransaction(eq(txId3), eq(1100L));
+  }
+
+  @Test
+  public void clientTransactionIsRemovedIfWithNoTimeout() {
+    spyTxMgr.scheduleToRemoveClientTransaction(txid, 0);
+
+    verify(spyTxMgr, times(1)).removeHostedTXState(eq(txid));
+  }
+
+  @Test
+  public void clientTransactionIsScheduledToBeReIfWithNoTimeout() {
+    spyTxMgr.scheduleToRemoveClientTransaction(txid, 1000);
+
+    verify(timer, times(1)).schedule(any(), eq(1000L));
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
index 1113246..547f5ce 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateProxyImplTest.java
@@ -15,7 +15,9 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
@@ -77,4 +79,24 @@ public class TXStateProxyImplTest {
     TXStateProxyImpl tx = new TXStateProxyImpl(cache, txManager, txId, false);
     assertThat(tx.getCache()).isSameAs(cache);
   }
+
+  @Test
+  public void isOverTransactionTimeoutLimitReturnsTrueIfHavingRecentOperation() {
+    TXStateProxyImpl tx = spy(new TXStateProxyImpl(cache, txManager, txId, false));
+    doReturn(0L).when(tx).getLastOperationTimeFromClient();
+    doReturn(1001L).when(tx).getCurrentTime();
+    when(txManager.getTransactionTimeToLive()).thenReturn(1);
+
+    assertThat(tx.isOverTransactionTimeoutLimit()).isEqualTo(true);
+  }
+
+  @Test
+  public void isOverTransactionTimeoutLimitReturnsFalseIfNotHavingRecentOperation() {
+    TXStateProxyImpl tx = spy(new TXStateProxyImpl(cache, txManager, txId, false));
+    doReturn(0L).when(tx).getLastOperationTimeFromClient();
+    doReturn(1000L).when(tx).getCurrentTime();
+    when(txManager.getTransactionTimeToLive()).thenReturn(1);
+
+    assertThat(tx.isOverTransactionTimeoutLimit()).isEqualTo(false);
+  }
 }


Mime
View raw message