geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esh...@apache.org
Subject incubator-geode git commit: GEODE-1546: fix issue when a proxy server was shut down before its scheduled task of removing departed client is invoked.
Date Fri, 24 Jun 2016 19:16:46 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop f38d6964b -> 87876a720


GEODE-1546: fix issue when a proxy server was shut down before its scheduled task of removing departed client is invoked.

Move transactionTimeToLive setting into TXManagerImpl.
Update proxy server to track the latest proxy after failover.
Expire client transactions if they do not fail over to new proxy servers and after transaction timeout period.
Add test cases for the above scenario.
Refactored the test code and made transactionTimeToLive setting adjustable to reduce test running time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/87876a72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/87876a72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/87876a72

Branch: refs/heads/develop
Commit: 87876a720c26ea67f284d64fcbb73ab3f33e5040
Parents: f38d696
Author: eshu <eshu@pivotal.io>
Authored: Fri Jun 24 11:51:27 2016 -0700
Committer: eshu <eshu@pivotal.io>
Committed: Fri Jun 24 11:59:59 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractCacheServer.java     |  19 +-
 .../gemfire/internal/cache/CacheServerImpl.java |   2 -
 .../internal/cache/RemoteOperationMessage.java  |   3 +-
 .../gemfire/internal/cache/TXManagerImpl.java   | 138 +++++++++-
 .../gemfire/internal/cache/TXState.java         |  11 +
 .../gemfire/internal/cache/TXStateProxy.java    |   3 +
 .../internal/cache/TXStateProxyImpl.java        |   9 +
 .../cache/partitioned/PartitionMessage.java     |   3 +-
 .../cache/tier/sockets/AcceptorImpl.java        |   3 +-
 .../cache/tier/sockets/BaseCommand.java         |   1 +
 .../cache/tier/sockets/CacheClientNotifier.java |  23 +-
 .../cache/tier/sockets/ClientHealthMonitor.java |  11 +-
 .../cache/xmlcache/CacheServerCreation.java     |   1 -
 .../disttx/DistributedTransactionDUnitTest.java |   3 +-
 .../cache/ClientServerTransactionDUnitTest.java | 276 ++++++++++++++++---
 .../tier/sockets/AcceptorImplJUnitTest.java     |  10 +-
 16 files changed, 421 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractCacheServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractCacheServer.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractCacheServer.java
index c9648a3..60e256d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractCacheServer.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractCacheServer.java
@@ -117,13 +117,6 @@ public abstract class AbstractCacheServer implements CacheServer {
    * members as clients of this server leave/crash. 
    */
   protected final ClientMembershipListener listener;
-
-  /**
-   * The number of seconds to keep transaction states for disconnected clients.
-   * This allows the client to fail over to another server and still find
-   * the transaction state to complete the transaction.
-   */
-  private int transactionTimeToLive;
   
   //////////////////////  Constructors  //////////////////////
 
@@ -147,9 +140,7 @@ public abstract class AbstractCacheServer implements CacheServer {
     this.tcpNoDelay = CacheServer.DEFAULT_TCP_NO_DELAY;
     this.maximumTimeBetweenPings = CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS;
     this.maximumMessageCount = CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT;
-    this.messageTimeToLive = CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE;
-    // TODO this should be configurable in CacheServer
-    this.transactionTimeToLive = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180);
+    this.messageTimeToLive = CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE;    
     this.groups = CacheServer.DEFAULT_GROUPS;
     this.bindAddress = CacheServer.DEFAULT_BIND_ADDRESS;
     this.hostnameForClients = CacheServer.DEFAULT_HOSTNAME_FOR_CLIENTS;
@@ -307,14 +298,6 @@ public abstract class AbstractCacheServer implements CacheServer {
     this.maximumMessageCount = maximumMessageCount;
   }
   
-  public void setTransactionTimeToLive(int seconds) {
-    this.transactionTimeToLive = seconds;
-  }
-  
-  public int getTransactionTimeToLive() {
-    return this.transactionTimeToLive;
-  }
-  
   public int getMessageTimeToLive() {
     return this.messageTimeToLive;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
index 2c72410..37c05eb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
@@ -264,7 +264,6 @@ public class CacheServerImpl
     setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings());
     setMaximumMessageCount(other.getMaximumMessageCount());
     setMessageTimeToLive(other.getMessageTimeToLive());
-//    setTransactionTimeToLive(other.getTransactionTimeToLive());  not implemented in CacheServer for v6.6
     setGroups(other.getGroups());
     setLoadProbe(other.getLoadProbe());
     setLoadPollInterval(other.getLoadPollInterval());
@@ -322,7 +321,6 @@ public class CacheServerImpl
                                      getMaxThreads(), 
                                      getMaximumMessageCount(),
                                      getMessageTimeToLive(),
-                                     getTransactionTimeToLive(),
                                      this.loadMonitor,
                                      overflowAttributesList, 
                                      this.isGatewayReceiver,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index db5bcca..c6ec0ab 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -245,7 +245,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
             // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
             sendReply = false;
           } else if (tx.isInProgress()) {
-            sendReply = operateOnRegion(dm, r, startTime);       
+            sendReply = operateOnRegion(dm, r, startTime);
+            tx.updateProxyServer(this.getSender());
           }  
         } finally {
           txMgr.unmasquerade(tx);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 1ea7f71..1512234 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.internal.*;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
@@ -137,7 +138,13 @@ public class TXManagerImpl implements CacheTransactionManager,
    */
   private final ThreadLocal<Boolean> isTXDistributed;
   
-
+  /**
+   * The number of seconds to keep transaction states for disconnected clients.
+   * This allows the client to fail over to another server and still find
+   * the transaction state to complete the transaction.
+   */
+  private int transactionTimeToLive;
+  
   /** Constructor that implements the {@link CacheTransactionManager}
    * interface. Only only one instance per {@link com.gemstone.gemfire.cache.Cache}
    *
@@ -155,6 +162,7 @@ public class TXManagerImpl implements CacheTransactionManager,
     this.hostedTXStates = new HashMap<TXId, TXStateProxy>();
     this.txContext = new ThreadLocal<TXStateProxy>();
     this.isTXDistributed = new ThreadLocal<Boolean>();
+    this.transactionTimeToLive = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180);
     currentInstance = this;
   }
 
@@ -937,6 +945,7 @@ public class TXManagerImpl implements CacheTransactionManager,
         }
       }
     }
+    expireClientTransactionsSentFromDepartedProxy(id);
   }
 
   public void memberJoined(InternalDistributedMember id) {
@@ -951,9 +960,9 @@ public class TXManagerImpl implements CacheTransactionManager,
   
 
   /**
-   * retrieve the transaction states for the given client
+   * retrieve the transaction TXIds for the given client
    * @param id the client's membership ID
-   * @return a set of the currently open transaction states
+   * @return a set of the currently open TXIds
    */
   public Set<TXId> getTransactionsForClient(InternalDistributedMember id) {
     Set<TXId> result = new HashSet<TXId>();
@@ -966,6 +975,23 @@ public class TXManagerImpl implements CacheTransactionManager,
     }
     return result;
   }
+  
+  /**
+   * retrieve the transaction states for the given client
+   * @param id the client's membership ID
+   * @return a set of the currently open transaction states
+   */
+  public Set<TXStateProxy> getTransactionStatesForClient(InternalDistributedMember id) {
+    Set<TXStateProxy> result = new HashSet<TXStateProxy>();
+    synchronized (this.hostedTXStates) {
+      for (Map.Entry<TXId, TXStateProxy> entry: this.hostedTXStates.entrySet()) {
+        if (entry.getKey().getMemberId().equals(id)) {
+          result.add(entry.getValue());
+        }
+      }
+    }
+    return result;
+  }
 
   /** remove the given TXStates */
   public void removeTransactions(Set<TXId> txIds, boolean distribute) {
@@ -1500,6 +1526,112 @@ public class TXManagerImpl implements CacheTransactionManager,
     }
   }
   
+  public void setTransactionTimeToLiveForTest(int seconds) {
+    this.transactionTimeToLive = seconds;
+  }
+  
+  /**
+   * @return the time-to-live for abandoned transactions, in seconds
+   */
+  public int getTransactionTimeToLive() {
+    return this.transactionTimeToLive;
+  }
+  
+  public InternalDistributedMember getMemberId() {
+    return this.distributionMgrId;
+  }
+  
+  //expire the transaction states for the lost proxy server based on timeout setting.  
+  private void expireClientTransactionsSentFromDepartedProxy(InternalDistributedMember proxyServer) {
+    if (this.cache.isClosed()) {
+      return; 
+    }
+    long timeout = getTransactionTimeToLive() * 1000;
+    if (timeout <= 0) {
+      removeTransactionsSentFromDepartedProxy(proxyServer);
+    } else {
+      if (departedProxyServers != null) departedProxyServers.add(proxyServer);  
+      SystemTimerTask task = new SystemTimerTask() {
+        @Override
+        public void run2() {
+          removeTransactionsSentFromDepartedProxy(proxyServer);
+          if (departedProxyServers != null) departedProxyServers.remove(proxyServer);
+        }
+      };
+      try {
+        ((GemFireCacheImpl)this.cache).getCCPTimer().schedule(task, timeout);
+      } catch (IllegalStateException ise) {
+        if (!((GemFireCacheImpl)this.cache).isClosed()) {
+          throw ise;
+        }
+        //task not able to be scheduled due to cache is closing,
+        //do not set it in the test hook.
+        if (departedProxyServers != null) departedProxyServers.remove(proxyServer);
+      }
+    }
+  }
+  
+  private final Set<InternalDistributedMember> departedProxyServers = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "trackScheduledToBeRemovedTx") ?
+      new ConcurrentHashSet<InternalDistributedMember>() : null;
+
+  /**
+   * provide a test hook to track departed peers
+   */
+  public Set<InternalDistributedMember> getDepartedProxyServers() {
+    return departedProxyServers;
+  }
+  
+  /**
+   * Find all client originated transactions sent from the departed proxy server.
+   * Remove them from the hostedTXStates map after the set TransactionTimeToLive period.
+   * @param proxyServer the departed proxy server
+   */
+  public void removeTransactionsSentFromDepartedProxy(InternalDistributedMember proxyServer) {
+    final Set<TXId> txIds = getTransactionsSentFromDepartedProxy(proxyServer);
+    if (txIds.isEmpty()) {
+      return;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("expiring the following transactions: {}", txIds);
+    }
+    synchronized (this.hostedTXStates) {
+      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<TXId,TXStateProxy> entry = iterator.next();
+        if (txIds.contains(entry.getKey())) {
+          //The TXState was not updated by any other proxy server, 
+          //The client would fail over to another proxy server.
+          //Remove it after waiting for transactionTimeToLive period.
+          entry.getValue().close();
+          iterator.remove();
+        }
+      }
+    }
+  }
+  
+  /*
+   * retrieve the transaction states for the given client from a certain proxy server.
+   * if transactions failed over, the new proxy server information should be stored
+   * in the TXState
+   * @param id the proxy server
+   * @return a set of the currently open transaction states
+   */
+  private Set<TXId> getTransactionsSentFromDepartedProxy(InternalDistributedMember proxyServer) {
+    Set<TXId> result = new HashSet<TXId>();
+    synchronized (this.hostedTXStates) {
+      for (Map.Entry<TXId, TXStateProxy> entry: this.hostedTXStates.entrySet()) {
+        TXStateProxy tx = entry.getValue();
+        if (tx.isRealDealLocal() && tx.isOnBehalfOfClient()) {
+          TXState txstate = (TXState) ((TXStateProxyImpl)tx).realDeal;          
+          if (proxyServer.equals(txstate.getProxyServer())) {
+            result.add(entry.getKey());
+          }
+        }
+      }
+    }
+    return result;
+  }
+  
   public void setDistributed(boolean flag) {
     checkClosed();
     TXStateProxy tx = getTXState();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index c42f63c..d64426b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -51,6 +51,7 @@ import com.gemstone.gemfire.cache.TransactionWriter;
 import com.gemstone.gemfire.cache.TransactionWriterException;
 import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 import com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess;
+import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.TXManagerCancelledException;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
@@ -138,6 +139,8 @@ public class TXState implements TXStateInterface {
   private Map<EventID, Boolean> seenResults = new HashMap<EventID, Boolean>();
 
   static final TXEntryState ENTRY_EXISTS = new TXEntryState();
+  
+  private volatile DistributedMember proxyServer;
 
   public TXState(TXStateProxy proxy,boolean onBehalfOfRemoteStub) 
   {
@@ -1849,4 +1852,12 @@ public class TXState implements TXStateInterface {
   public boolean isCreatedOnDistTxCoordinator() {
     return false;
   }
+  
+  public void setProxyServer(DistributedMember proxyServer) {
+    this.proxyServer = proxyServer;
+  }
+  
+  public DistributedMember getProxyServer() {
+    return this.proxyServer;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
index ba9866e..8f8d1cd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
@@ -21,6 +21,7 @@ package com.gemstone.gemfire.internal.cache;
 
 import com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess;
 import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
 
 /**
@@ -92,4 +93,6 @@ public interface TXStateProxy extends TXStateInterface {
    * @param progress
    */
   public void setInProgress(boolean progress);
+  
+  public void updateProxyServer(InternalDistributedMember proxy);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
index 0939ab0..c4ebadd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
@@ -1044,4 +1044,13 @@ public class TXStateProxyImpl implements TXStateProxy {
   public boolean isCreatedOnDistTxCoordinator() {
     return false;
   }
+
+  @Override
+  public void updateProxyServer(InternalDistributedMember proxy) {
+    //only update in TXState if it has one
+    if (this.realDeal != null && this.realDeal.isRealDealLocal() 
+        && isOnBehalfOfClient()) {
+      ((TXState)this.realDeal).setProxyServer(proxy);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
index 14fce08..351638a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionMessage.java
@@ -343,7 +343,8 @@ public abstract class PartitionMessage extends DistributionMessage implements
             // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
             sendReply = false;
           } else if (tx.isInProgress()) {
-            sendReply = operateOnPartitionedRegion(dm, pr, startTime);        
+            sendReply = operateOnPartitionedRegion(dm, pr, startTime); 
+            tx.updateProxyServer(this.getSender());
           }  
         } finally {
           txMgr.unmasquerade(tx);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
index ffcb8c5..cfa6333 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImpl.java
@@ -270,7 +270,6 @@ public class AcceptorImpl extends Acceptor implements Runnable
                       int socketBufferSize, int maximumTimeBetweenPings,
                       InternalCache c, int maxConnections, int maxThreads,
                       int maximumMessageCount, int messageTimeToLive,
-                      int transactionTimeToLive,
                       ConnectionListener listener,List overflowAttributesList, 
                       boolean isGatewayReceiver, List<GatewayTransportFilter> transportFilter,
                       boolean tcpNoDelay)
@@ -487,7 +486,7 @@ public class AcceptorImpl extends Acceptor implements Runnable
     this.crHelper = new CachedRegionHelper(this.cache);
 
     this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats,
-        maximumMessageCount,messageTimeToLive, transactionTimeToLive,
+        maximumMessageCount,messageTimeToLive, 
         connectionListener,overflowAttributesList, isGatewayReceiver);
     this.socketBufferSize = socketBufferSize;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index bdebb6b..aa61e76 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -135,6 +135,7 @@ public abstract class BaseCommand implements Command {
         try {
           tx = txMgr.masqueradeAs(msg, member, false);
           cmdExecute(msg, servConn, start);
+          tx.updateProxyServer(txMgr.getMemberId());
         } finally {
           txMgr.unmasquerade(tx);
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 30ab4a4..09ffd8f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -79,7 +79,6 @@ public class CacheClientNotifier {
    * @param acceptorStats        
    * @param maximumMessageCount
    * @param messageTimeToLive 
-   * @param transactionTimeToLive - ttl for txstates for disconnected clients
    * @param listener 
    * @param overflowAttributesList 
    * @return A <code>CacheClientNotifier</code> instance
@@ -87,13 +86,11 @@ public class CacheClientNotifier {
   public static synchronized CacheClientNotifier getInstance(Cache cache,
       CacheServerStats acceptorStats,
       int maximumMessageCount, int messageTimeToLive,
-      int transactionTimeToLive,
       ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver)
   {
     if (ccnSingleton == null) {
       ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount, 
-          messageTimeToLive, transactionTimeToLive,
-          listener, overflowAttributesList, isGatewayReceiver);
+          messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver);
     }
     
     if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
@@ -114,9 +111,6 @@ public class CacheClientNotifier {
   public static CacheClientNotifier getInstance(){
     return ccnSingleton;
   }
-
-  /** the amount of time in seconds to keep a disconnected client's txstates around */
-  private final int transactionTimeToLive;
   
   /**
    * Writes a given message to the output stream
@@ -2080,15 +2074,13 @@ public class CacheClientNotifier {
    * @param acceptorStats
    * @param maximumMessageCount
    * @param messageTimeToLive
-   * @param transactionTimeToLive - ttl for txstates for disconnected clients
    * @param listener a listener which should receive notifications
    *          abouts queues being added or removed.
    * @param overflowAttributesList
    */
   private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, 
-      int maximumMessageCount, int messageTimeToLive, int transactionTimeToLive,
-      ConnectionListener listener,
-      List overflowAttributesList, boolean isGatewayReceiver) {
+      int maximumMessageCount, int messageTimeToLive, 
+      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) {
     // Set the Cache
     this.setCache((GemFireCacheImpl)cache);
     this.acceptorStats = acceptorStats;
@@ -2104,7 +2096,6 @@ public class CacheClientNotifier {
 
     this.maximumMessageCount = maximumMessageCount;
     this.messageTimeToLive = messageTimeToLive;
-    this.transactionTimeToLive = transactionTimeToLive;
 
     // Initialize the statistics
     StatisticsFactory factory ;
@@ -2689,13 +2680,5 @@ public class CacheClientNotifier {
       }
     }
   }
-
-
-  /**
-   * @return the time-to-live for abandoned transactions, in seconds
-   */
-  public int getTransactionTimeToLive() {
-    return this.transactionTimeToLive;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientHealthMonitor.java
index eb701fb..cdb0133 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -281,7 +281,7 @@ public class ClientHealthMonitor {
 
   /**
    * expire the transaction states for the given client.  This uses the
-   * transactionTimeToLive setting that is inherited from the CacheServer.
+   * transactionTimeToLive setting that is inherited from the TXManagerImpl.
    * If that setting is non-positive we expire the states immediately
    * @param proxyID
    */
@@ -289,12 +289,11 @@ public class ClientHealthMonitor {
     final TXManagerImpl txMgr = (TXManagerImpl)this._cache.getCacheTransactionManager(); 
     final Set<TXId> txids = txMgr.getTransactionsForClient(
           (InternalDistributedMember)proxyID.getDistributedMember());
-    CacheClientNotifier notifier = CacheClientNotifier.getInstance();
-    if (notifier == null || this._cache.isClosed()) {
-      return; // notifier is null when shutting down
+    if (this._cache.isClosed()) {
+      return; 
     }
-    long timeout = notifier.getTransactionTimeToLive() * 1000;
-    if (txids.size() > 0) {
+    long timeout = txMgr.getTransactionTimeToLive() * 1000;
+    if (!txids.isEmpty()) {
       if (logger.isDebugEnabled()) {
         logger.debug("expiring {} transaction contexts for {} timeout={}", txids.size(), proxyID, timeout/1000);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheServerCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheServerCreation.java
index 22d684f..10f5366 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheServerCreation.java
@@ -71,7 +71,6 @@ public class CacheServerCreation extends AbstractCacheServer {
     setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings());
     setMaximumMessageCount(other.getMaximumMessageCount());
     setMessageTimeToLive(other.getMessageTimeToLive());
-    //      setTransactionTimeToLive(other.getTransactionTimeToLive());  not implemented in CacheServer for v6.6
     setGroups(other.getGroups());
     setLoadProbe(other.getLoadProbe());
     setLoadPollInterval(other.getLoadPollInterval());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistributedTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistributedTransactionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistributedTransactionDUnitTest.java
index 8f962f7..6d9a172 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistributedTransactionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistributedTransactionDUnitTest.java
@@ -138,8 +138,9 @@ public class DistributedTransactionDUnitTest extends JUnit4CacheTestCase {
         int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
         CacheServer s = getCache().addCacheServer();
         s.setPort(port);
-        ((CacheServerImpl) s).setTransactionTimeToLive(10);
         s.start();
+        TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+        txMgr.setTransactionTimeToLiveForTest(10);
         return port;
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
index 860833e..69d3fe4 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
@@ -36,6 +36,9 @@ import java.util.concurrent.TimeUnit;
 
 import javax.naming.Context;
 import javax.transaction.UserTransaction;
+
+import com.jayway.awaitility.Awaitility;
+
 import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
 
 import com.gemstone.gemfire.cache.*;
@@ -87,6 +90,14 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
   protected void postSetUpClientServerTransactionDUnitTest() throws Exception {
   }
 
+
+  private Integer createRegionsAndStartServerWithTimeout(VM vm, boolean accessor, int txTimeoutSecs) {
+    return createRegionOnServerWithTimeout(vm, true, accessor, txTimeoutSecs);
+  }
+  private Integer createRegionOnServerWithTimeout(VM vm, final boolean startServer,
+      final boolean accessor, final int txTimeoutSecs) {
+    return createRegionOnServerWithTimeout(vm, startServer, accessor, 0, txTimeoutSecs);
+  }
   private Integer createRegionsAndStartServer(VM vm, boolean accessor) {
     return createRegionOnServer(vm, true, accessor);
   }
@@ -98,14 +109,20 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     return createRegionOnServer(vm, startServer, accessor, 0);
   }
   private Integer createRegionOnServer(VM vm, final boolean startServer, final boolean accessor, final int redundantCopies) {
+    return createRegionOnServerWithTimeout(vm, startServer, accessor, redundantCopies, 10);
+  }
+  
+  private Integer createRegionOnServerWithTimeout(VM vm, final boolean startServer, final boolean accessor, 
+      final int redundantCopies, final int txTimeoutSecs) {
     return (Integer)vm.invoke(new SerializableCallable() {
       public Object call() throws Exception {
         createRegion(accessor, redundantCopies, null);
+        TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+        txMgr.setTransactionTimeToLiveForTest(txTimeoutSecs);
         if (startServer) {
           int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
           CacheServer s = getCache().addCacheServer();
           s.setPort(port);
-          ((CacheServerImpl)s).setTransactionTimeToLive(10);
           s.start();
           return port;
         }
@@ -126,11 +143,12 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
         InternalDistributedSystem system = getSystem(props);
         Cache cache = CacheFactory.create(system);
         cache.createRegion(OTHER_REGION,af.create());
+        TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+        txMgr.setTransactionTimeToLiveForTest(10);
         if (startServer) {
           int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
           CacheServer s = cache.addCacheServer();
           s.setPort(port);
-          ((CacheServerImpl)s).setTransactionTimeToLive(10);
           s.start();
           return port;
         }
@@ -162,9 +180,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     vm.invoke(new SerializableCallable() {
       public Object call() throws Exception {
         ClientCacheFactory ccf = new ClientCacheFactory();
-        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port);
-        ccf.setPoolSubscriptionEnabled(false);
-        ccf.set(LOG_LEVEL, getDUnitLogLevel());
+        setCCF(port, ccf);
         // these settings were used to manually check that tx operation stats were being updated
         //ccf.set(STATISTIC_SAMPLING_ENABLED, "true");
         //ccf.set(STATISTIC_ARCHIVE_FILE, "clientStats.gfs");
@@ -195,10 +211,11 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     // a client VM
     final int serverPort = (Integer)accessor.invoke(new SerializableCallable("create cache server") {
       public Object call() throws Exception {
+        TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+        txMgr.setTransactionTimeToLiveForTest(10);
         int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
         CacheServer s = getCache().addCacheServer();
         s.setPort(port);
-        ((CacheServerImpl)s).setTransactionTimeToLive(10);
         s.start();
         return port;
       }
@@ -222,10 +239,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     final int port1 = createRegionsAndStartServer(datastore1, false);
     System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
     ClientCacheFactory ccf = new ClientCacheFactory();
-    ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
-    ccf.setPoolSubscriptionEnabled(false);
-
-    ccf.set(LOG_LEVEL, getDUnitLogLevel());
+    setCCF(port1, ccf);
 
     ClientCache cCache = getClientCache(ccf);
     
@@ -250,13 +264,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
 
     TXManagerImpl mgr = getGemfireCache().getTxManager();
     mgr.begin();
-    for (int i=0; i<5; i++) {
-      CustId custId = new CustId(i);
-      Customer cust = new Customer("name"+i, "address"+i);
-      getGemfireCache().getLogger().info("putting:"+custId);
-      pr.put(custId, cust);
-      r.put(i, "value"+i);
-    }
+    doTxOps(r, pr);
     boolean exceptionThrown = false;
     try {
       otherRegion.put("tx", "not allowed");
@@ -290,14 +298,12 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     
     disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses
 
-    final int port1 = createRegionsAndStartServer(accessor, true);
-    createRegionOnServer(datastore, false, false);
+    final int port1 = createRegionsAndStartServerWithTimeout(accessor, true, 5);
+    createRegionOnServerWithTimeout(datastore, false, false, 5);
 
     System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
     ClientCacheFactory ccf = new ClientCacheFactory();
-    ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
-    ccf.setPoolSubscriptionEnabled(false);
-    ccf.set(LOG_LEVEL, getDUnitLogLevel());
+    setCCF(port1, ccf);
     ClientCache cCache = getClientCache(ccf);
     ClientRegionFactory<CustId, Customer> custrf = cCache
       .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
@@ -308,13 +314,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
 
     TXManagerImpl mgr = getGemfireCache().getTxManager();
     mgr.begin();
-    for (int i=0; i<5; i++) {
-      CustId custId = new CustId(i);
-      Customer cust = new Customer("name"+i, "address"+i);
-      getGemfireCache().getLogger().info("putting:"+custId);
-      pr.put(custId, cust);
-      r.put(i, "value"+i);
-    }
+    doTxOps(r, pr);
 
     final DistributedMember myId = cCache.getDistributedSystem().getDistributedMember(); 
     
@@ -359,6 +359,188 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       cCache.close();
     }
   }
+  
+  @Test
+  public void testCleanupAfterClientAndProxyFailure() {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    final boolean cachingProxy = false;
+    
+    disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses
+
+    final int port1 = createRegionsAndStartServerWithTimeout(accessor, true, 5);
+    createRegionOnServerWithTimeout(datastore, false, false, 5);
+
+    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
+    ClientCacheFactory ccf = new ClientCacheFactory();
+    setCCF(port1, ccf);
+    ClientCache cCache = getClientCache(ccf);
+    ClientRegionFactory<CustId, Customer> custrf = cCache
+      .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
+    ClientRegionFactory<Integer, String> refrf = cCache
+      .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
+    Region<Integer, String> r = refrf.create(D_REFERENCE);
+    Region<CustId, Customer> pr = custrf.create(CUSTOMER);
+
+    TXManagerImpl mgr = getGemfireCache().getTxManager();
+    mgr.begin();
+    doTxOps(r, pr);
+
+    final DistributedMember myId = cCache.getDistributedSystem().getDistributedMember(); 
+    
+    SerializableCallable verifyExists = new SerializableCallable("verify txstate for client exists") {
+      public Object call() throws Exception {
+        TXManagerImpl txmgr = getGemfireCache().getTxManager();
+        Set states = txmgr.getTransactionsForClient((InternalDistributedMember)myId);
+        assertEquals(1, states.size()); // only one in-progress transaction
+        return null;
+      }
+    };
+
+    accessor.invoke(verifyExists);
+    datastore.invoke(verifyExists);
+    
+    accessor.invoke(()->closeCache());
+    accessor.invoke(()->disconnectFromDS());
+    
+    SerializableCallable verifyExpired = new SerializableCallable("verify txstate is expired") {
+      public Object call() throws Exception {
+        final TXManagerImpl txmgr = getGemfireCache().getTxManager();
+        return verifyTXStateExpired(myId, txmgr);
+      }
+    };
+    try {
+      datastore.invoke(verifyExpired);
+    } finally {
+      cCache.close();
+    }
+  }
+
+  void doTxOps(Region<Integer, String> r, Region<CustId, Customer> pr) {
+    for (int i=0; i<5; i++) {
+      CustId custId = new CustId(i);
+      Customer cust = new Customer("name"+i, "address"+i);
+      getGemfireCache().getLogger().info("putting:"+custId);
+      pr.put(custId, cust);
+      r.put(i, "value"+i);
+    }
+  }
+  
+  public static DistributedMember getVMDistributedMember() {
+    return InternalDistributedSystem.getAnyInstance().getDistributedMember();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFailoverAfterProxyFailure() throws InterruptedException {
+    Host host = Host.getHost(0);
+    VM accessor = host.getVM(0);
+    VM datastore = host.getVM(1);
+    VM accessor2 = host.getVM(2);
+    final boolean cachingProxy = false;
+    
+    disconnectAllFromDS(); // some other VMs seem to be hanging around and have the region this tests uses
+
+    int[] ports = new int[2];
+    ports[0] = createRegionsAndStartServerWithTimeout(accessor, true, 5);
+    ports[1] = createRegionsAndStartServerWithTimeout(accessor2, true, 5);
+    createRegionOnServerWithTimeout(datastore, false, false, 5);
+    
+    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
+    ClientCacheFactory ccf = new ClientCacheFactory();
+    setCCF(ports, ccf);
+    ClientCache cCache = getClientCache(ccf);
+    ClientRegionFactory<CustId, Customer> custrf = cCache
+      .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
+    ClientRegionFactory<Integer, String> refrf = cCache
+      .createClientRegionFactory(cachingProxy ? ClientRegionShortcut.CACHING_PROXY : ClientRegionShortcut.PROXY);
+    Region<Integer, String> r = refrf.create(D_REFERENCE);
+    Region<CustId, Customer> pr = custrf.create(CUSTOMER);
+
+    TXManagerImpl mgr = getGemfireCache().getTxManager();
+    mgr.begin();
+    doTxOps(r, pr);
+
+    final DistributedMember myId = cCache.getDistributedSystem().getDistributedMember(); 
+    final DistributedMember accessorId = (DistributedMember)accessor.invoke(() -> ClientServerTransactionDUnitTest.getVMDistributedMember());
+    final DistributedMember accessor2Id = (DistributedMember)accessor2.invoke(() -> ClientServerTransactionDUnitTest.getVMDistributedMember());
+    
+    SerializableCallable verifyExists = new SerializableCallable("verify txstate for client exists") {
+      public Object call() throws Exception {
+        TXManagerImpl txmgr = getGemfireCache().getTxManager();
+        Set<TXId> states = txmgr.getTransactionsForClient((InternalDistributedMember)myId);
+        assertEquals(1, states.size()); // only one in-progress transaction
+        return null;
+      }
+    };
+
+    datastore.invoke(verifyExists);
+    
+    SerializableCallable getProxyServer = new SerializableCallable("get proxy server") {
+      public Object call() throws Exception {
+        final TXManagerImpl txmgr = getGemfireCache().getTxManager();
+        DistributedMember proxyServer = null;
+        TXStateProxyImpl tx = null;
+        Set<TXStateProxy> states = txmgr.getTransactionStatesForClient((InternalDistributedMember)myId);
+        assertEquals(1, states.size()); 
+        Iterator<TXStateProxy> iterator = states.iterator();
+        if (iterator.hasNext()) {
+          tx = (TXStateProxyImpl)iterator.next();
+          assertTrue(tx.isRealDealLocal());
+          proxyServer = ((TXState)tx.realDeal).getProxyServer();
+        }
+        return proxyServer;
+      }
+    };
+    
+    final DistributedMember proxy = (DistributedMember) datastore.invoke(getProxyServer);
+    
+    if (proxy.equals(accessorId)) {
+      accessor.invoke(()->closeCache());
+      accessor.invoke(()->disconnectFromDS());
+    } else {
+      assertTrue(proxy.equals(accessor2Id));
+      accessor2.invoke(()->closeCache());
+      accessor2.invoke(()->disconnectFromDS());
+    }
+    
+    doTxOps(r, pr);
+    
+    SerializableCallable verifyProxyServerChanged = new SerializableCallable("verify proxy server is updated") {
+      public Object call() throws Exception {
+        final TXManagerImpl txmgr = getGemfireCache().getTxManager();
+        TXStateProxyImpl tx = null;
+        Set<TXStateProxy> states = txmgr.getTransactionStatesForClient((InternalDistributedMember)myId);
+        assertEquals(1, states.size()); 
+        Iterator<TXStateProxy> iterator = states.iterator();
+        if (iterator.hasNext()) {
+          tx = (TXStateProxyImpl)iterator.next();
+          assertTrue(tx.isRealDealLocal());
+        }
+        return verifyProxyServerChanged(tx, proxy);
+      }
+    };
+    try {
+      datastore.invoke(verifyProxyServerChanged);
+    } finally {
+      cCache.close();
+    }
+  }
+
+  void setCCF(final int port1, ClientCacheFactory ccf) {
+    ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
+    ccf.setPoolSubscriptionEnabled(false);
+    ccf.set(LOG_LEVEL, getDUnitLogLevel());
+  }
+  
+  void setCCF(final int[] ports, ClientCacheFactory ccf) {
+    for (int port: ports) {
+      ccf.addPoolServer("localhost", port);
+    }
+    ccf.setPoolSubscriptionEnabled(false);
+    ccf.set(LOG_LEVEL, getDUnitLogLevel());
+  }
 
   @Test
   public void testBasicCommitOnEmpty() {
@@ -1371,9 +1553,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
     client.invoke(new SerializableCallable() {
       public Object call() throws Exception {
         ClientCacheFactory ccf = new ClientCacheFactory();
-        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
-        ccf.setPoolSubscriptionEnabled(false);
-        ccf.set(LOG_LEVEL, getDUnitLogLevel());
+        setCCF(port1, ccf);
         ClientCache cCache = getClientCache(ccf);
         ClientRegionFactory<CustId, Customer> custrf = cCache
             .createClientRegionFactory(ClientRegionShortcut.PROXY);
@@ -2785,9 +2965,7 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
           System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "bridge.disableShufflingOfEndpoints", "true");
           ClientCacheFactory ccf = new ClientCacheFactory();
           ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port);
-          ccf.addPoolServer("localhost", port2);
-          ccf.setPoolSubscriptionEnabled(false);
-          ccf.set(LOG_LEVEL, getDUnitLogLevel());
+          setCCF(port2, ccf);
           // these settings were used to manually check that tx operation stats were being updated
           //ccf.set(STATISTIC_SAMPLING_ENABLED, "true");
           //ccf.set(STATISTIC_ARCHIVE_FILE, "clientStats.gfs");
@@ -3473,4 +3651,32 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       }
     });
   }
+
+  Object verifyTXStateExpired(final DistributedMember myId, final TXManagerImpl txmgr) {
+    try {
+      Wait.waitForCriterion(new WaitCriterion() {
+        public boolean done() {
+          Set states = txmgr.getTransactionsForClient((InternalDistributedMember)myId);
+          com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("found " + states.size() + " tx states for " + myId);
+          return states.isEmpty();
+        }
+        public String description() {
+          return "Waiting for transaction state to expire";
+        }
+      }, 15000, 500, true);
+      return null;
+    } finally {
+      getGemfireCache().getDistributedSystem().disconnect();
+    }
+  }
+  
+  Object verifyProxyServerChanged(final TXStateProxyImpl tx, final DistributedMember newProxy) {
+    try {
+      Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+      .atMost(30, TimeUnit.SECONDS).until(() -> !((TXState)tx.realDeal).getProxyServer().equals(newProxy));
+      return null;
+    } finally {
+      getGemfireCache().getDistributedSystem().disconnect();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/87876a72/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7c52a89..b703f00 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -84,7 +84,7 @@ public class AcceptorImplJUnitTest
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1,
           CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
-          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,0,null,null, false, Collections.EMPTY_LIST,
+          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,null,null, false, Collections.EMPTY_LIST,
           CacheServer.DEFAULT_TCP_NO_DELAY);
         fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
       } catch (IllegalArgumentException expected) {
@@ -101,7 +101,7 @@ public class AcceptorImplJUnitTest
           0,
           CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
-          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,0,null,null,false, Collections.EMPTY_LIST,
+          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,null,null,false, Collections.EMPTY_LIST,
           CacheServer.DEFAULT_TCP_NO_DELAY);
         fail("Expected an IllegalArgumentExcption due to max conns of zero");
       } catch (IllegalArgumentException expected) {
@@ -118,7 +118,7 @@ public class AcceptorImplJUnitTest
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS,
           CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
-          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,0,null,null,false, Collections.EMPTY_LIST,
+          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,null,null,false, Collections.EMPTY_LIST,
           CacheServer.DEFAULT_TCP_NO_DELAY);
         a2 = new AcceptorImpl(
           port1,
@@ -130,7 +130,7 @@ public class AcceptorImplJUnitTest
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS,
           CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
-          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,0,null,null,false, Collections.EMPTY_LIST,
+          CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,null,null,false, Collections.EMPTY_LIST,
           CacheServer.DEFAULT_TCP_NO_DELAY);
         fail("Expecetd a BindException while attaching to the same port");
       } catch (BindException expected) {
@@ -146,7 +146,7 @@ public class AcceptorImplJUnitTest
         AcceptorImpl.MINIMUM_MAX_CONNECTIONS,
         CacheServer.DEFAULT_MAX_THREADS,
         CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
-        CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,0,null,null, false, Collections.EMPTY_LIST,
+        CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,null,null, false, Collections.EMPTY_LIST,
         CacheServer.DEFAULT_TCP_NO_DELAY);
       assertEquals(port2, a3.getPort());
       InternalDistributedSystem isystem = (InternalDistributedSystem) this.cache.getDistributedSystem();


Mime
View raw message