geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bschucha...@apache.org
Subject incubator-geode git commit: GEODE-1761 Clients don't fail back when servers are bounced
Date Fri, 19 Aug 2016 18:39:17 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 79620807b -> cea5535cf


GEODE-1761 Clients don't fail back when servers are bounced

Servers will send a refresh hint to clients if they detect that a request
had to be send to a different server who owned the primary bucket
affected by the operation.  Clients should always refresh when this
happens unless they have connection-pool size constraints that force
them to use non-optimal servers.

Client-side operation classes have been modified to initiate the refresh.
I've added code in the meta-data service class to avoid performing
multiple concurrent refreshes on the same region.

On the server-side I've cleaned up some of the network-hop detection
code to stop using hard-coded integers and to consolidate some of
the code that resets the ThreadLocals being used to record network-
hops detected.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/cea5535c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/cea5535c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/cea5535c

Branch: refs/heads/develop
Commit: cea5535cf2e0de1ee1ef8ea902ea44a2f48b4fd5
Parents: 7962080
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Fri Aug 19 11:37:39 2016 -0700
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Fri Aug 19 11:39:01 2016 -0700

----------------------------------------------------------------------
 .../cache/client/internal/AbstractOp.java       | 11 +++
 .../client/internal/ClientMetadataService.java  | 40 +++++++----
 .../cache/client/internal/DestroyOp.java        | 22 +++---
 .../client/internal/GetClientPRMetaDataOp.java  | 24 +++----
 .../gemfire/cache/client/internal/GetOp.java    | 18 ++---
 .../gemfire/cache/client/internal/PutAllOp.java |  7 +-
 .../gemfire/cache/client/internal/PutOp.java    | 46 +++---------
 .../cache/client/internal/RemoveAllOp.java      |  7 +-
 .../internal/SingleHopOperationCallable.java    |  1 +
 .../gemfire/internal/cache/CachePerfStats.java  |  2 +-
 .../internal/cache/PartitionedRegion.java       | 73 ++++++++++++++------
 .../cache/tier/sockets/BaseCommand.java         |  4 +-
 .../internal/cache/tier/sockets/Message.java    |  2 -
 .../cache/tier/sockets/command/Destroy.java     |  8 +--
 .../cache/tier/sockets/command/Destroy65.java   |  7 +-
 .../cache/tier/sockets/command/Destroy70.java   |  2 +-
 .../cache/tier/sockets/command/Get70.java       |  9 ++-
 .../cache/tier/sockets/command/Invalidate.java  |  8 +--
 .../tier/sockets/command/Invalidate70.java      |  2 +-
 .../cache/tier/sockets/command/Put61.java       |  7 +-
 .../cache/tier/sockets/command/Put65.java       | 13 ++--
 .../cache/tier/sockets/command/Put70.java       |  2 +-
 .../cache/tier/sockets/command/PutAll.java      |  7 +-
 .../cache/tier/sockets/command/PutAll70.java    |  9 ++-
 .../cache/tier/sockets/command/PutAll80.java    |  9 ++-
 .../cache/tier/sockets/command/RemoveAll.java   |  9 ++-
 .../cache/tier/sockets/command/Request.java     |  7 +-
 .../PartitionedRegionSingleHopDUnitTest.java    |  5 +-
 .../internal/cache/SingleHopStatsDUnitTest.java | 17 +++--
 29 files changed, 198 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
index 1eb0dbd..f93620e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/AbstractOp.java
@@ -50,6 +50,8 @@ public abstract class AbstractOp implements Op {
   
   private final Message msg;
 
+  private boolean allowDuplicateMetadataRefresh;
+
   protected AbstractOp(int msgType, int msgParts) {
     this.msg = new Message(msgParts, Version.CURRENT);
     getMessage().setMessageType(msgType);
@@ -301,6 +303,15 @@ public abstract class AbstractOp implements Op {
       }
     }
   }
+
+  public boolean isAllowDuplicateMetadataRefresh() {
+    return allowDuplicateMetadataRefresh;
+  }
+
+  public void setAllowDuplicateMetadataRefresh(final boolean allowDuplicateMetadataRefresh) {
+    this.allowDuplicateMetadataRefresh = allowDuplicateMetadataRefresh;
+  }
+
   /**
    * Used by subclasses who get chunked responses.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
index 6e255c4..4005b78 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ClientMetadataService.java
@@ -20,10 +20,7 @@ import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
-import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
-import com.gemstone.gemfire.internal.cache.EntryOperationImpl;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegionHelper;
+import com.gemstone.gemfire.internal.cache.*;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import org.apache.logging.log4j.Logger;
@@ -62,7 +59,9 @@ public final class ClientMetadataService {
 
   private boolean isMetadataRefreshed_TEST_ONLY = false;
   
-  private int fetchTaskCount = 0;
+  private int refreshTaskCount = 0;
+  
+  private Set<String> regionsBeingRefreshed = new HashSet<>();
   
   private final Object fetchTaskCountLock = new Object();
   
@@ -521,7 +520,7 @@ public final class ClientMetadataService {
     }
     else {
       synchronized (fetchTaskCountLock){
-        fetchTaskCount++;
+        refreshTaskCount++;
       }
       Runnable fetchTask = new Runnable() {
         @SuppressWarnings("synthetic-access")
@@ -541,7 +540,7 @@ public final class ClientMetadataService {
           }
           finally {
             synchronized (fetchTaskCountLock){
-              fetchTaskCount--;
+              refreshTaskCount--;
             }
           }
         }
@@ -610,14 +609,19 @@ public final class ClientMetadataService {
     ClientPartitionAdvisor advisor = this.getClientPartitionAdvisor(region.getFullPath());
     if(advisor!= null && advisor.getServerGroup().length()!= 0 && HONOUR_SERVER_GROUP_IN_PR_SINGLE_HOP){
       if (logger.isDebugEnabled()) {
-        logger.debug("Scheduling metadata refresh : {}", nwHopType);
+        logger.debug("Scheduling metadata refresh: {} region: {}", nwHopType, region.getName());
       }
-      if(nwHopType == (byte)2){
+      if( nwHopType == PartitionedRegion.NETWORK_HOP_TO_DIFFERENT_GROUP){
+        return;
+      }
+    }
+    synchronized (fetchTaskCountLock) {
+      if (regionsBeingRefreshed.contains(region.getFullPath())) {
         return;
       }
     }
-    region.getCachePerfStats().incNonSingleHopsCount();
     if (isRecursive) {
+      region.getCachePerfStats().incNonSingleHopsCount();
       try {
         getClientPRMetadata(region);
       } catch (VirtualMachineError e) {
@@ -630,8 +634,13 @@ public final class ClientMetadataService {
         }
       }
     } else {
-      synchronized (fetchTaskCountLock){
-        fetchTaskCount++;
+      synchronized (fetchTaskCountLock) {
+        if (regionsBeingRefreshed.contains(region.getFullPath())) {
+          return;
+        }
+        region.getCachePerfStats().incNonSingleHopsCount();
+        regionsBeingRefreshed.add(region.getFullPath());
+        refreshTaskCount++;
       }
       Runnable fetchTask = new Runnable() {
         @SuppressWarnings("synthetic-access")
@@ -649,7 +658,8 @@ public final class ClientMetadataService {
           }
           finally {
             synchronized (fetchTaskCountLock){
-              fetchTaskCount--;
+              regionsBeingRefreshed.remove(region.getFullPath());
+              refreshTaskCount--;
             }
           }
         }
@@ -849,9 +859,9 @@ public final class ClientMetadataService {
     this.isMetadataStable = isMetadataStable;
   }
 
-  public int getFetchTaskCount() {
+  public int getRefreshTaskCount() {
     synchronized(fetchTaskCountLock) {
-      return fetchTaskCount;
+      return refreshTaskCount;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
index e8ce1a7..ab9b2d1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/DestroyOp.java
@@ -64,7 +64,7 @@ public class DestroyOp {
     if (logger.isDebugEnabled()) {
       logger.debug("Preparing DestroyOp for {} operation={}", key, operation);
     }
-    AbstractOp op = new DestroyOpImpl(region, key, expectedOldValue,
+    DestroyOpImpl op = new DestroyOpImpl(region, key, expectedOldValue,
         operation, event, callbackArg, prSingleHopEnabled);
     if (prSingleHopEnabled) {
       ClientMetadataService cms = region.getCache()
@@ -77,6 +77,7 @@ public class DestroyOp {
           boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
               .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
               : false);
+          op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx);
           return pool.executeOn(server, op, true, onlyUseExistingCnx);
         }
         catch (AllConnectionsInUseException e) {
@@ -140,10 +141,10 @@ public class DestroyOp {
 
     private boolean prSingleHopEnabled = false;
     
-    private Object callbackArg;
-    
     private EntryEventImpl event;
     
+    private Object callbackArg;
+
     /**
      * @throws com.gemstone.gemfire.SerializationException if serialization fails
      */
@@ -178,6 +179,7 @@ public class DestroyOp {
       super(MessageType.DESTROY, callbackArg != null ? 6 : 5);
       this.key = key;
       this.event = event;
+      this.callbackArg = callbackArg;
       getMessage().addStringPart(region);
       getMessage().addStringOrObjPart(key);
       getMessage().addObjPart(expectedOldValue);
@@ -215,7 +217,6 @@ public class DestroyOp {
         }
       }
       if (prSingleHopEnabled) {
-        byte version = 0 ;
 //        if (log.fineEnabled()) {
 //          log.fine("reading prSingleHop part #" + (partIdx+1));
 //        }
@@ -224,18 +225,17 @@ public class DestroyOp {
         if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
             && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
           if (this.region != null) {
-            ClientMetadataService cms = null;
             try {
-              cms = region.getCache().getClientMetadataService();
-              version = cms.getMetaDataVersion(region, Operation.UPDATE,
-                  key, null, callbackArg);
+              ClientMetadataService cms = region.getCache().getClientMetadataService();
+              int myVersion = cms.getMetaDataVersion(region, Operation.UPDATE,
+                key, null, callbackArg);
+              if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) {
+                cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
+              }
             }
             catch (CacheClosedException e) {
               return null;
             }
-            if (bytesReceived[0] != version) {
-              cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
-            }
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
index 9a467f7..240aabb 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetClientPRMetaDataOp.java
@@ -104,22 +104,22 @@ public class GetClientPRMetaDataOp {
           for (int i = 0; i < numParts; i++) {
             Object result = msg.getPart(i).getObject();
             List<BucketServerLocation66> locations = (List<BucketServerLocation66>)result;
-          if (!locations.isEmpty()) {
-            int bucketId = locations.get(0).getBucketId();
-            if (isDebugEnabled) {
-              logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations);
-            }
-            advisor.updateBucketServerLocations(bucketId, locations, cms);
-            
-            Set<ClientPartitionAdvisor> cpas = cms
+            if (!locations.isEmpty()) {
+              int bucketId = locations.get(0).getBucketId();
+              if (isDebugEnabled) {
+                logger.debug("GetClientPRMetaDataOpImpl#processResponse: for bucketId : {} locations are {}", bucketId, locations);
+              }
+              advisor.updateBucketServerLocations(bucketId, locations, cms);
+
+              Set<ClientPartitionAdvisor> cpas = cms
                 .getColocatedClientPartitionAdvisor(regionFullPath);
-            if (cpas != null && !cpas.isEmpty()) {
-              for (ClientPartitionAdvisor colCPA : cpas) {
-                colCPA.updateBucketServerLocations(bucketId, locations, cms);
+              if (cpas != null && !cpas.isEmpty()) {
+                for (ClientPartitionAdvisor colCPA : cpas) {
+                  colCPA.updateBucketServerLocations(bucketId, locations, cms);
+                }
               }
             }
           }
-          }
           if (isDebugEnabled) {
             logger.debug("GetClientPRMetaDataOpImpl#processResponse: received ClientPRMetadata from server successfully.");
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
index 6864306..00f81fd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/GetOp.java
@@ -62,7 +62,7 @@ public class GetOp {
       Object key, Object callbackArg, boolean prSingleHopEnabled, EntryEventImpl clientEvent) {
     ClientMetadataService cms = ((GemFireCacheImpl)region.getCache())
         .getClientMetadataService();
-    AbstractOp op = new GetOpImpl(region, key, callbackArg,
+    GetOpImpl op = new GetOpImpl(region, key, callbackArg,
         prSingleHopEnabled, clientEvent);
 
     if (logger.isDebugEnabled()) {
@@ -77,6 +77,7 @@ public class GetOp {
             boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
                 .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
                 : false);
+            op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx);
             return pool.executeOn(new ServerLocation(server.getHostName(),
                 server.getPort()), op, true, onlyUseExistingCnx);
           }
@@ -113,7 +114,7 @@ public class GetOp {
     private Object callbackArg;
 
     private EntryEventImpl clientEvent;
-    
+
     public String toString() {
       return "GetOpImpl(key="+key+")";
     }
@@ -182,18 +183,17 @@ public class GetOp {
               byte[] bytesReceived = part.getSerializedForm();
               if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
                   && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
-                ClientMetadataService cms;
                 try {
-                  cms = region.getCache().getClientMetadataService();
-                  version = cms.getMetaDataVersion(region, Operation.UPDATE, key,
-                      null, callbackArg);
+                  ClientMetadataService cms = region.getCache().getClientMetadataService();
+                  int myVersion = cms.getMetaDataVersion(region, Operation.UPDATE,
+                    key, null, callbackArg);
+                  if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) {
+                    cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
+                  }
                 }
                 catch (CacheClosedException e) {
                   return null;
                 }
-                if (bytesReceived[0] != version) {
-                  cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
-                }
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
index 1610456..112d533 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutAllOp.java
@@ -358,12 +358,11 @@ public class PutAllOp {
                                      } else if (o instanceof byte[]) {
                                        if (prSingleHopEnabled) {
                                          byte[] bytesReceived = part.getSerializedForm();
-                                         if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
+                                         if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
                                            if (region != null) {
-                                             ClientMetadataService cms;
                                              try {
-                                               cms = region.getCache().getClientMetadataService();
-                                               cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                                               ClientMetadataService cms = region.getCache().getClientMetadataService();
+                                               cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
                                              }
                                              catch (CacheClosedException e) {
                                              }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
index 072ec4e..35760b5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PutOp.java
@@ -69,9 +69,8 @@ public class PutOp {
                              Operation operation,
                              boolean requireOldValue, Object expectedOldValue,
                              Object callbackArg,
-                             boolean prSingleHopEnabled)
-  {
-    AbstractOp op = new PutOpImpl(region, key, value, deltaBytes, event,
+                             boolean prSingleHopEnabled) {
+    PutOpImpl op = new PutOpImpl(region, key, value, deltaBytes, event,
         operation, requireOldValue,
         expectedOldValue, callbackArg,
         false/*donot send full obj; send delta*/, prSingleHopEnabled);
@@ -86,6 +85,7 @@ public class PutOp {
           boolean onlyUseExistingCnx = ((poolImpl.getMaxConnections() != -1 && poolImpl
               .getConnectionCount() >= poolImpl.getMaxConnections()) ? true
               : false);
+          op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx);
           return pool.executeOn(new ServerLocation(server.getHostName(), server
               .getPort()), op, true, onlyUseExistingCnx);
         }
@@ -106,6 +106,7 @@ public class PutOp {
       Object key, Object value, byte[] deltaBytes, EntryEventImpl event, Operation operation,
       boolean requireOldValue, Object expectedOldValue,
       Object callbackArg, boolean prSingleHopEnabled, boolean isMetaRegionPutOp) {
+
     AbstractOp op = new PutOpImpl(regionName, key, value, deltaBytes, event,
         operation, requireOldValue,
         expectedOldValue, callbackArg,
@@ -135,8 +136,7 @@ public class PutOp {
                              Object value,
                              EntryEventImpl event,
                              Object callbackArg,
-                             boolean prSingleHopEnabled)
-  {
+                             boolean prSingleHopEnabled) {
     AbstractOp op = new PutOpImpl(regionName, key, value, null,
         event, Operation.CREATE, false,
         null, callbackArg, false /*donot send full Obj; send delta*/, prSingleHopEnabled);
@@ -181,6 +181,7 @@ public class PutOp {
 
     private Object expectedOldValue;
     
+    
     public PutOpImpl(String regionName , Object key, Object value, byte[] deltaBytes, 
         EntryEventImpl event,
         Operation op, boolean requireOldValue,
@@ -311,30 +312,6 @@ public class PutOp {
     @Override
     protected Object processResponse(Message msg) throws Exception {
       throw new UnsupportedOperationException("processResponse should not be invoked in PutOp.  Use processResponse(Message, Connection)");
-//      processAck(msg, "put");
-//      if (prSingleHopEnabled) {
-//        byte version = 0;
-//        Part part = msg.getPart(0);
-//        byte[] bytesReceived = part.getSerializedForm();
-//        if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
-//            && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) { // nw hop
-//          if (this.region != null) {
-//            ClientMetadataService cms;
-//            try {
-//              cms = region.getCache().getClientMetadataService();
-//              version = cms.getMetaDataVersion(region, Operation.UPDATE,
-//                  key, value, callbackArg);
-//            }
-//            catch (CacheClosedException e) {
-//              return null;
-//            }
-//            if (bytesReceived[0] != version) {
-//              cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
-//            }
-//          }
-//        }
-//      }
-//      return null;
     }
 
     /*
@@ -353,18 +330,17 @@ public class PutOp {
     protected Object processResponse(Message msg, Connection con)
         throws Exception {
       processAck(msg, "put", con);
-      byte version = 0 ;
+
       if (prSingleHopEnabled) {
         Part part = msg.getPart(0);
         byte[] bytesReceived = part.getSerializedForm();
         if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
             && bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
           if (this.region != null) {
-            ClientMetadataService cms;
-              cms = region.getCache().getClientMetadataService();
-              version = cms.getMetaDataVersion(region, Operation.UPDATE,
-                  key, value, callbackArg);
-            if (bytesReceived[0] != version) {
+            ClientMetadataService cms = region.getCache().getClientMetadataService();
+            byte myVersion = cms.getMetaDataVersion(region, Operation.UPDATE,
+                               key, value, callbackArg);
+            if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) {
               cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
index 1ab1ed3..7e62d00 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RemoveAllOp.java
@@ -313,12 +313,11 @@ public class RemoveAllOp {
                                      } else if (o instanceof byte[]) {
                                        if (prSingleHopEnabled) {
                                          byte[] bytesReceived = part.getSerializedForm();
-                                         if (/*bytesReceived.length==1 &&*/ bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) { // nw hop
+                                         if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION) {
                                            if (region != null) {
-                                             ClientMetadataService cms;
                                              try {
-                                               cms = region.getCache().getClientMetadataService();
-                                               cms.scheduleGetPRMetaData(region, false,bytesReceived[1]);
+                                               ClientMetadataService cms = region.getCache().getClientMetadataService();
+                                               cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
                                              }
                                              catch (CacheClosedException e) {
                                              }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
index 6047a50..a42ba8d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopOperationCallable.java
@@ -49,6 +49,7 @@ public class SingleHopOperationCallable implements Callable {
     Object result = null;
     boolean onlyUseExistingCnx = ((pool.getMaxConnections() != -1 && pool
         .getConnectionCount() >= pool.getMaxConnections()) ? true : false);
+    op.setAllowDuplicateMetadataRefresh(! onlyUseExistingCnx);
     try {
       UserAttributes.userAttributes.set(securityAttributes);
       result = this.pool.executeOn(server, op, true, onlyUseExistingCnx);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
index 532bafa..7fa2183 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
@@ -234,7 +234,7 @@ public class CachePerfStats {
     final String reliableRegionsMissingNoAccessDesc = "Current number of regions configured for reliablity that are missing required roles with No access";
     final String clearsDesc = "The total number of times a clear has been done on this cache.";
     final String nonSingleHopsDesc = "Total number of times client request observed more than one hop during operation.";
-    final String metaDataRefreshCountDesc = "Total number of times the meta data is refreshed due to hopping obsevred.";
+    final String metaDataRefreshCountDesc = "Total number of times the meta data is refreshed due to hopping observed.";
     final String conflatedEventsDesc = "Number of events not delivered due to conflation.  Typically this means that the event arrived after a later event was already applied to the cache.";
     final String tombstoneCountDesc = "Number of destroyed entries that are retained for concurrent modification detection";
     final String tombstoneGCCountDesc = "Number of garbage-collections performed on destroyed entries";

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 485b94d..df9ceba 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -24,6 +24,7 @@ import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.cache.TimeoutException;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.client.internal.*;
 import com.gemstone.gemfire.cache.execute.*;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
@@ -131,6 +132,25 @@ public class PartitionedRegion extends LocalRegion implements
       DistributionConfig.GEMFIRE_PREFIX + "PartitionedRegionRandomSeed", NanoTimer.getTime()).longValue());
   
   private static final AtomicInteger SERIAL_NUMBER_GENERATOR = new AtomicInteger();
+
+  /**
+   * getNetworkHopType byte indicating this was the bucket owner for
+   * the last operation
+   */
+  public static final int NETWORK_HOP_NONE = 0;
+
+  /**
+   * getNetworkHopType byte indicating this was not the bucket owner and
+   * a message had to be sent to a primary in the same server group
+   */
+  public static final int NETWORK_HOP_TO_SAME_GROUP = 1;
+
+  /**
+   * getNetworkHopType byte indicating this was not the bucket owner and
+   * a message had to be sent to a primary in a different server group
+   */
+  public static final int NETWORK_HOP_TO_DIFFERENT_GROUP = 2;
+
   
   private final DiskRegionStats diskRegionStats;
   /**
@@ -325,34 +345,47 @@ public class PartitionedRegion extends LocalRegion implements
    * Byte 0 = no NWHOP Byte 1 = NWHOP to servers in same server-grp Byte 2 =
    * NWHOP tp servers in other server-grp
    */
-  private final ThreadLocal<Byte> isNetworkHop = new ThreadLocal<Byte>() {
+  private final ThreadLocal<Byte> networkHopType = new ThreadLocal<Byte>() {
     @Override
     protected Byte initialValue() {
-      return Byte.valueOf((byte)0);
+      return Byte.valueOf((byte)NETWORK_HOP_NONE);
     }
   };
 
-  public void setIsNetworkHop(Byte value) {
-    this.isNetworkHop.set(value);
+  public void clearNetworkHopData() {
+    this.networkHopType.remove();
+    this.metadataVersion.remove();
+  }
+  
+  private void setNetworkHopType(Byte value) {
+    this.networkHopType.set(value);
   }
 
-  public Byte isNetworkHop() {
-    return this.isNetworkHop.get();
+  /**
+   * <p>
+   * If the last operation in the current thread required a one-hop to
+   * another server who held the primary bucket for the operation then
+   * this will return something other than NETWORK_HOP_NONE.
+   * </p>
+   * see NETWORK_HOP_NONE, NETWORK_HOP_TO_SAME_GROUP and NETWORK_HOP_TO_DIFFERENT_GROUP
+   */
+  public byte getNetworkHopType() {
+    return this.networkHopType.get().byteValue();
   }
   
   private final ThreadLocal<Byte> metadataVersion = new ThreadLocal<Byte>() {
     @Override
     protected Byte initialValue() {
-      return 0;
+      return ClientMetadataService.INITIAL_VERSION;
     }
   };
 
-  public void setMetadataVersion(Byte value) {
+  private void setMetadataVersion(Byte value) {
     this.metadataVersion.set(value);
   }
 
-  public Byte getMetadataVersion() {
-    return this.metadataVersion.get();
+  public byte getMetadataVersion() {
+    return this.metadataVersion.get().byteValue();
   }
       
 
@@ -1464,7 +1497,7 @@ public class PartitionedRegion extends LocalRegion implements
           String name = Thread.currentThread().getName();
           if (name.startsWith("ServerConnection")
               && !getMyId().equals(targetNode)) {
-            setNetworkHop(bucketIdInt, (InternalDistributedMember)targetNode);
+            setNetworkHopType(bucketIdInt, (InternalDistributedMember)targetNode);
           }
         }
         
@@ -1929,7 +1962,7 @@ public class PartitionedRegion extends LocalRegion implements
       }
 
       if (event.isBridgeEvent() && bucketStorageAssigned) {
-        setNetworkHop(bucketId, targetNode);
+        setNetworkHopType(bucketId, targetNode);
       }
       if (putAllOp_save == null) {
         result = putInBucket(targetNode,
@@ -4012,7 +4045,7 @@ public class PartitionedRegion extends LocalRegion implements
           String name = Thread.currentThread().getName();
           if (name.startsWith("ServerConnection")
               && !getMyId().equals(retryNode)) {
-            setNetworkHop(bucketId, (InternalDistributedMember)retryNode);
+            setNetworkHopType(bucketId, (InternalDistributedMember)retryNode);
           }
         }
         return obj;
@@ -5468,7 +5501,7 @@ public class PartitionedRegion extends LocalRegion implements
         }
         else {
           if (event.isBridgeEvent()) {
-            setNetworkHop(bucketId, currentTarget);
+            setNetworkHopType(bucketId, currentTarget);
           }
           destroyRemotely(currentTarget,
                           bucketId,
@@ -5557,8 +5590,8 @@ public class PartitionedRegion extends LocalRegion implements
    * @param targetNode
    */
 
-  private void setNetworkHop(final Integer bucketId,
-      final InternalDistributedMember targetNode) {
+  private void setNetworkHopType(final Integer bucketId,
+                                 final InternalDistributedMember targetNode) {
 
     if (this.isDataStore() && !getMyId().equals(targetNode)) {
       Set<ServerBucketProfile> profiles = this.getRegionAdvisor()
@@ -5569,15 +5602,15 @@ public class PartitionedRegion extends LocalRegion implements
           if (profile.getDistributedMember().equals(targetNode)) {
 
             if (isProfileFromSameGroup(profile)) {
-              if (this.isNetworkHop() != 1 && logger.isDebugEnabled()) {
+              if (this.getNetworkHopType() != NETWORK_HOP_TO_SAME_GROUP && logger.isDebugEnabled()) {
                 logger.debug("one-hop: cache op meta data staleness observed.  Message is in same server group (byte 1)");
               }
-              this.setIsNetworkHop((byte)1);
+              this.setNetworkHopType((byte)NETWORK_HOP_TO_SAME_GROUP);
             } else {
-              if (this.isNetworkHop() != 2 && logger.isDebugEnabled()) {
+              if (this.getNetworkHopType() != NETWORK_HOP_TO_DIFFERENT_GROUP && logger.isDebugEnabled()) {
                 logger.debug("one-hop: cache op meta data staleness observed.  Message is to different server group (byte 2)");
               }
-              this.setIsNetworkHop((byte)2);
+              this.setNetworkHopType((byte)NETWORK_HOP_TO_DIFFERENT_GROUP);
             }
             this.setMetadataVersion((byte)profile.getVersion());
             break;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
index f3485d2..009e869 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/BaseCommand.java
@@ -284,7 +284,7 @@ public abstract class BaseCommand implements Command {
     replyMsg.setMessageType(MessageType.REPLY);
     replyMsg.setNumberOfParts(1);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     replyMsg.send(servConn);
     pr.getPrStats().incPRMetaDataSentCount();
     if (logger.isTraceEnabled()) {
@@ -701,7 +701,7 @@ public abstract class BaseCommand implements Command {
     if (callbackArg != null) {
       responseMsg.addObjPart(callbackArg);
     }
-    responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop});
+    responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion(),nwHop});
     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
     responseMsg.send(servConn);
     origMsg.clearParts();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index 4947e20..b5506d6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -16,8 +16,6 @@
  */
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
-import static com.sun.tools.doclint.Entity.part;
-
 import com.gemstone.gemfire.SerializationException;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.internal.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
index bc47e2a..10a72dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy.java
@@ -29,7 +29,6 @@ import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.internal.cache.EventIDHolder;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
 import com.gemstone.gemfire.internal.cache.tier.Command;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
 import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
@@ -181,10 +180,9 @@ public class Destroy extends BaseCommand {
     }
     if (region instanceof PartitionedRegion) {
       PartitionedRegion pr = (PartitionedRegion) region;
-      if (pr.isNetworkHop() != (byte) 0) {
-        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop());
-        pr.setIsNetworkHop((byte) 0);
-        pr.setMetadataVersion(Byte.valueOf((byte) 0));
+      if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType());
+        pr.clearNetworkHopData();
       } else {
         writeReply(msg, servConn);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
index a571f71..6c41fb5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy65.java
@@ -301,10 +301,9 @@ public class Destroy65 extends BaseCommand {
 
     if (region instanceof PartitionedRegion) {
       PartitionedRegion pr = (PartitionedRegion) region;
-      if (pr.isNetworkHop() != (byte) 0) {
-        writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.isNetworkHop(), clientEvent.getVersionTag());
-        pr.setIsNetworkHop((byte) 0);
-        pr.setMetadataVersion((byte) 0);
+      if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+        writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove, pr.getNetworkHopType(), clientEvent.getVersionTag());
+        pr.clearNetworkHopData();
       } else {
         writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(), clientEvent.getVersionTag());
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java
index 82d9c1a..aa9b865 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Destroy70.java
@@ -65,7 +65,7 @@ public class Destroy70 extends Destroy65 {
     if (versionTag != null) {
       replyMsg.addObjPart(versionTag);
     }
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     replyMsg.addIntPart(entryNotFoundForRemove? 1 : 0);
     pr.getPrStats().incPRMetaDataSentCount();
     replyMsg.send(servConn);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
index f5d9937..c3daa64 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Get70.java
@@ -201,11 +201,10 @@ public class Get70 extends BaseCommand {
 
       if (region instanceof PartitionedRegion) {
         PartitionedRegion pr = (PartitionedRegion) region;
-        if (pr.isNetworkHop() != (byte) 0) {
+        if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
           writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject,
-            servConn, pr, pr.isNetworkHop(), versionTag, keyNotPresent);
-          pr.setIsNetworkHop((byte) 0);
-          pr.setMetadataVersion(Byte.valueOf((byte) 0));
+            servConn, pr, pr.getNetworkHopType(), versionTag, keyNotPresent);
+          pr.clearNetworkHopData();
         }
         else {
           writeResponse(data, callbackArg, msg, isObject, versionTag, keyNotPresent, servConn);
@@ -490,7 +489,7 @@ public class Get70 extends BaseCommand {
       responseMsg.addObjPart(versionTag);
     }
 
-    responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(),nwHop});
+    responseMsg.addBytesPart(new byte[]{pr.getMetadataVersion(),nwHop});
     servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
     responseMsg.send(servConn);
     origMsg.clearParts();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
index 166b11a..2faa177 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate.java
@@ -27,7 +27,6 @@ import com.gemstone.gemfire.internal.cache.EventID;
 import com.gemstone.gemfire.internal.cache.EventIDHolder;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.tier.CachedRegionHelper;
 import com.gemstone.gemfire.internal.cache.tier.Command;
 import com.gemstone.gemfire.internal.cache.tier.MessageType;
 import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
@@ -190,10 +189,9 @@ public class Invalidate extends BaseCommand {
     }
     if (region instanceof PartitionedRegion) {
       PartitionedRegion pr = (PartitionedRegion) region;
-      if (pr.isNetworkHop() != (byte) 0) {
-        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop(), tag);
-        pr.setIsNetworkHop((byte) 0);
-        pr.setMetadataVersion(Byte.valueOf((byte) 0));
+      if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType(), tag);
+        pr.clearNetworkHopData();
       } else {
         writeReply(msg, servConn, tag);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java
index 6200438..f831999 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Invalidate70.java
@@ -60,7 +60,7 @@ public class Invalidate70 extends Invalidate {
     if (versionTag != null) {
       replyMsg.addObjPart(versionTag);
     }
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     pr.getPrStats().incPRMetaDataSentCount();
     replyMsg.send(servConn);
     if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
index 4529b2d..bfc3f20 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put61.java
@@ -261,10 +261,9 @@ public class Put61 extends BaseCommand {
     // Increment statistics and write the reply
     if (region instanceof PartitionedRegion) {
       PartitionedRegion pr = (PartitionedRegion) region;
-      if (pr.isNetworkHop() != (byte) 0) {
-        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.isNetworkHop());
-        pr.setIsNetworkHop((byte) 0);
-        pr.setMetadataVersion(Byte.valueOf((byte) 0));
+      if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+        writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType());
+        pr.clearNetworkHopData();
       } else {
         writeReply(msg, servConn);
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
index 48d923c..e164ef5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put65.java
@@ -50,7 +50,6 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.security.AuthorizeRequest;
-import com.gemstone.gemfire.internal.security.SecurityService;
 import com.gemstone.gemfire.internal.util.Breadcrumbs;
 import com.gemstone.gemfire.security.GemFireSecurityException;
 
@@ -426,12 +425,10 @@ public class Put65 extends BaseCommand {
     // Increment statistics and write the reply
     if (region instanceof PartitionedRegion) {
       PartitionedRegion pr = (PartitionedRegion) region;
-      if (pr.isNetworkHop().byteValue() != (byte) 0) {
-        writeReplyWithRefreshMetadata(msg, servConn, pr, sendOldValue, oldValueIsObject, oldValue, pr.isNetworkHop()
-                                                                                                     .byteValue(), clientEvent
-          .getVersionTag());
-        pr.setIsNetworkHop((byte) 0);
-        pr.setMetadataVersion(Byte.valueOf((byte) 0));
+      if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+        writeReplyWithRefreshMetadata(msg, servConn, pr, sendOldValue, oldValueIsObject, oldValue, pr.getNetworkHopType()
+          , clientEvent.getVersionTag());
+        pr.clearNetworkHopData();
       } else {
         writeReply(msg, servConn, sendOldValue, oldValueIsObject, oldValue, clientEvent.getVersionTag());
       }
@@ -482,7 +479,7 @@ public class Put65 extends BaseCommand {
     replyMsg.setMessageType(MessageType.REPLY);
     replyMsg.setNumberOfParts(sendOldValue ? 3 : 1);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[] { pr.getMetadataVersion().byteValue(), nwHopType });
+    replyMsg.addBytesPart(new byte[] { pr.getMetadataVersion(), nwHopType });
     if (sendOldValue) {
       replyMsg.addIntPart(oldValueIsObject ? 1 : 0);
       replyMsg.addObjPart(oldValue);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java
index af16bed..bb517d9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Put70.java
@@ -105,7 +105,7 @@ public class Put70 extends Put65 {
     }
     replyMsg.setNumberOfParts(parts);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHopType});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHopType});
     replyMsg.addIntPart(flags);
     if (sendOldValue) {
 //      if (logger.fineEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java
index 955677f..1579a1c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll.java
@@ -189,10 +189,9 @@ public class PutAll extends BaseCommand {
       
       if (region instanceof PartitionedRegion) {
         PartitionedRegion pr = (PartitionedRegion)region;
-        if (pr.isNetworkHop() != (byte)0) {
-          writeReplyWithRefreshMetadata(msg, servConn,pr,pr.isNetworkHop());
-          pr.setIsNetworkHop((byte)0);
-          pr.setMetadataVersion(Byte.valueOf((byte)0));
+        if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+          writeReplyWithRefreshMetadata(msg, servConn,pr,pr.getNetworkHopType());
+          pr.clearNetworkHopData();
           replyWithMetaData = true;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java
index 4e6e167..02bbc40 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll70.java
@@ -250,10 +250,9 @@ public class PutAll70 extends BaseCommand {
       
       if (region instanceof PartitionedRegion) {
         PartitionedRegion pr = (PartitionedRegion)region;
-        if (pr.isNetworkHop().byteValue() != 0) {
-          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop());
-          pr.setIsNetworkHop(Byte.valueOf((byte)0));
-          pr.setMetadataVersion(Byte.valueOf((byte)0));
+        if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType());
+          pr.clearNetworkHopData();
           replyWithMetaData = true;
         }
       }
@@ -349,7 +348,7 @@ public class PutAll70 extends BaseCommand {
     replyMsg.setMessageType(MessageType.REPLY);
     replyMsg.setNumberOfParts(2);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     if (response != null) {
       response.clearObjects();
       replyMsg.addObjPart(response);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java
index 78f5612..beeb3ce 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/PutAll80.java
@@ -291,10 +291,9 @@ public class PutAll80 extends BaseCommand {
       
       if (region instanceof PartitionedRegion) {
         PartitionedRegion pr = (PartitionedRegion)region;
-        if (pr.isNetworkHop().byteValue() != 0) {
-          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop());
-          pr.setIsNetworkHop(Byte.valueOf((byte)0));
-          pr.setMetadataVersion(Byte.valueOf((byte)0));
+        if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType());
+          pr.clearNetworkHopData();
           replyWithMetaData = true;
         }
       }
@@ -417,7 +416,7 @@ public class PutAll80 extends BaseCommand {
     }
     replyMsg.setNumberOfParts(1);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     if (listSize > 0) {
       replyMsg.setLastChunk(false);
       replyMsg.sendChunk(servConn);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java
index 474d942..4203447 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/RemoveAll.java
@@ -217,10 +217,9 @@ public class RemoveAll extends BaseCommand {
       
       if (region instanceof PartitionedRegion) {
         PartitionedRegion pr = (PartitionedRegion)region;
-        if (pr.isNetworkHop().byteValue() != 0) {
-          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.isNetworkHop());
-          pr.setIsNetworkHop(Byte.valueOf((byte)0));
-          pr.setMetadataVersion(Byte.valueOf((byte)0));
+        if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
+          writeReplyWithRefreshMetadata(msg, response, servConn, pr, pr.getNetworkHopType());
+          pr.clearNetworkHopData();
           replyWithMetaData = true;
         }
       }
@@ -346,7 +345,7 @@ public class RemoveAll extends BaseCommand {
     }
     replyMsg.setNumberOfParts(1);
     replyMsg.setTransactionId(origMsg.getTransactionId());
-    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion().byteValue(), nwHop});
+    replyMsg.addBytesPart(new byte[]{pr.getMetadataVersion(), nwHop});
     if (listSize > 0) {
       replyMsg.setLastChunk(false);
       replyMsg.sendChunk(servConn);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
index c84e189..d19fe44 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/Request.java
@@ -189,11 +189,10 @@ public class Request extends BaseCommand {
         
         if (region instanceof PartitionedRegion) {
           PartitionedRegion pr = (PartitionedRegion)region;
-          if (pr.isNetworkHop() != (byte)0) {
+          if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
             writeResponseWithRefreshMetadata(data, callbackArg, msg, isObject,
-                servConn, pr,pr.isNetworkHop());
-            pr.setIsNetworkHop((byte)0);
-            pr.setMetadataVersion(Byte.valueOf((byte)0));
+                servConn, pr,pr.getNetworkHopType());
+            pr.clearNetworkHopData();
           }
           else {
             writeResponse(data, callbackArg, msg, isObject, servConn);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java
index 8c934d8..78fd5e0 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionSingleHopDUnitTest.java
@@ -35,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import com.jayway.awaitility.Awaitility;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -400,7 +399,7 @@ public class PartitionedRegionSingleHopDUnitTest extends JUnit4CacheTestCase {
     Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.isRefreshMetadataTestOnly() == true);
 
     //make sure all fetch tasks are completed
-    Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getFetchTaskCount() == 0);
+    Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getRefreshTaskCount() == 0);
 
     cms.satisfyRefreshMetadata_TEST_ONLY(false);
     region.put(new Integer(0), "create0");
@@ -1965,7 +1964,7 @@ public class PartitionedRegionSingleHopDUnitTest extends JUnit4CacheTestCase {
   private void verifyMetadata() {
     ClientMetadataService cms = ((GemFireCacheImpl)cache).getClientMetadataService();
     //make sure all fetch tasks are completed
-    Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getFetchTaskCount() == 0);
+    Awaitility.waitAtMost(60, TimeUnit.SECONDS).until(() -> cms.getRefreshTaskCount() == 0);
 
     final Map<String, ClientPartitionAdvisor> regionMetaData = cms
         .getClientPRMetadata_TEST_ONLY();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/cea5535c/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java
index e611086..a3e9d08 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/SingleHopStatsDUnitTest.java
@@ -124,16 +124,19 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
     Integer port1 = (Integer) member1.invoke(() -> createServerForStats(0, 113, "No_Colocation"));
     Integer port2 = (Integer) member2.invoke(() -> createServerForStats(0, 113, "No_Colocation"));
 
-    member3.invoke(() -> createClient(port0, port1, port2, "No_Colocation"));
+    member3.invoke("createClient", () -> createClient(port0, port1, port2, "No_Colocation"));
+    System.out.println("createClient");
     createClient(port0, port1, port2, "No_Colocation");
 
-    member3.invoke(() -> createPR("FirstClient", "No_Colocation"));
+    member3.invoke("createPR", () -> createPR("FirstClient", "No_Colocation"));
+    System.out.println("createPR");
     createPR("SecondClient", "No_Colocation");
 
-    member3.invoke(() -> getPR("FirstClient", "No_Colocation"));
+    member3.invoke("getPR", () -> getPR("FirstClient", "No_Colocation"));
+    System.out.println("getPR");
     getPR("SecondClient", "No_Colocation");
 
-    member3.invoke(() -> updatePR("FirstClient", "No_Colocation"));
+    member3.invoke("updatePR", () -> updatePR("FirstClient", "No_Colocation"));
   }
 
   @Test
@@ -163,7 +166,7 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
           .addServer("localhost", port1).addServer("localhost", port2)
           .setRetryAttempts(5)
           .setMinConnections(1)
-          .setMaxConnections(1)
+          .setMaxConnections(-1)
           .setSubscriptionEnabled(false)
           .create(Region_Name);
     } finally {
@@ -325,7 +328,10 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
         nonSingleHopsCount = ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount();
         assertTrue(metaDataRefreshCount != 0); // hops are not predictable
         assertTrue(nonSingleHopsCount != 0);
+        
+        System.out.println("metadata refresh count after second pass is " + metaDataRefreshCount);
       } else {
+        System.out.println("creating keys in second client");
         for (int i = 0; i < 226; i++) {
           region.create(new Integer(i), "create" + i);
         }
@@ -341,6 +347,7 @@ public class SingleHopStatsDUnitTest extends JUnit4CacheTestCase {
         nonSingleHopsCount = ((LocalRegion) region).getCachePerfStats().getNonSingleHopsCount();
         assertTrue(metaDataRefreshCount != 0); // hops are not predictable
         assertTrue(nonSingleHopsCount != 0);
+        System.out.println("metadata refresh count in second client is " + metaDataRefreshCount);
       }
     } else {
       createdColocatedPRData(cache);


Mime
View raw message