geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hiteshkhame...@apache.org
Subject incubator-geode git commit: GEODE-1723 Merge from 82 for performance improvement.
Date Wed, 03 Aug 2016 17:02:47 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 73e413bf6 -> 0e8e5945b


GEODE-1723 Merge from 82 for performance improvement.

Other improvement:We take lock on key while doing op on BucketRegion.
In that case we notify to other thread only when there is a thread
waiting for it. Modified one condition to log message to reduce garbage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0e8e5945
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0e8e5945
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0e8e5945

Branch: refs/heads/develop
Commit: 0e8e5945b0f1105199509887880ca3656f222fe4
Parents: 73e413b
Author: Hitesh Khamesra <hkhamesra@pivotal.io>
Authored: Wed Aug 3 10:01:13 2016 -0700
Committer: Hitesh Khamesra <hkhamesra@pivotal.io>
Committed: Wed Aug 3 10:03:17 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       |  61 +++++++---
 .../gemfire/internal/cache/BucketRegion.java    |   5 +-
 .../cache/DistributedCacheOperation.java        |   4 +-
 .../internal/cache/FilterRoutingInfo.java       |  39 ++++---
 .../internal/cache/ha/HAContainerMap.java       |  65 ++++-------
 .../internal/cache/ha/HAContainerRegion.java    |  31 ++---
 .../internal/cache/ha/HAContainerWrapper.java   |   3 +-
 .../internal/cache/ha/HARegionQueue.java        | 102 ++++++++++-------
 .../internal/cache/partitioned/LockObject.java  |  22 +++-
 .../cache/tier/sockets/CacheClientNotifier.java |   6 +-
 .../tier/sockets/ClientUpdateMessageImpl.java   | 113 +++++++++++--------
 .../internal/cache/tier/sockets/Message.java    |  52 ++++++++-
 .../internal/cache/tier/sockets/Part.java       |  13 ++-
 .../sanctionedDataSerializables.txt             |  14 +--
 .../cache/query/internal/cq/CqServiceImpl.java  |  97 ++++++++++------
 .../cache/query/cq/dunit/CqPerfDUnitTest.java   |   3 +-
 .../cq/dunit/CqPerfUsingPoolDUnitTest.java      |   3 +-
 17 files changed, 399 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index f3cb3d6..2e87c2e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -36,6 +36,7 @@ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
 import com.gemstone.gemfire.internal.cache.region.entry.RegionEntryFactoryBuilder;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessageImpl;
 import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
 import com.gemstone.gemfire.internal.cache.versions.*;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
@@ -752,31 +753,65 @@ public abstract class AbstractRegionMap implements RegionMap {
         if (haContainer == null) {
           return false;
         }
-        Map.Entry entry = null;
         HAEventWrapper original = null;
-        synchronized (haContainer) {
-          entry = (Map.Entry)haContainer.getEntry(haEventWrapper);
-          if (entry != null) {
-            original = (HAEventWrapper)entry.getKey();
-            original.incAndGetReferenceCount();
+//      synchronized (haContainer) {
+      do {
+        ClientUpdateMessageImpl oldMsg = (ClientUpdateMessageImpl) haContainer
+            .putIfAbsent(haEventWrapper,
+                haEventWrapper.getClientUpdateMessage());
+        if (oldMsg != null) {
+          original = (HAEventWrapper) haContainer.getKey(haEventWrapper);
+          if (original == null) {
+            continue;
           }
-          else {
+          synchronized (original) {
+            if ((HAEventWrapper) haContainer.getKey(original) != null) {
+              original.incAndGetReferenceCount();
+              HARegionQueue.addClientCQsAndInterestList(oldMsg,
+                  haEventWrapper, haContainer, owner.getName());
+              haEventWrapper.setClientUpdateMessage(null);
+              newValue = CachedDeserializableFactory.create(original,
+                  ((CachedDeserializable) newValue).getSizeInBytes());
+            } else {
+              original = null;
+            }
+          }
+        } else { // putIfAbsent successful
+          synchronized (haEventWrapper) {
             haEventWrapper.incAndGetReferenceCount();
             haEventWrapper.setHAContainer(haContainer);
-            haContainer.put(haEventWrapper, haEventWrapper
-                .getClientUpdateMessage());
             haEventWrapper.setClientUpdateMessage(null);
             haEventWrapper.setIsRefFromHAContainer(true);
           }
+          break;
         }
+        // try until we either get a reference to HAEventWrapper from
+        // HAContainer or successfully put one into it.
+      } while (original == null);
+      /*
+        entry = (Map.Entry)haContainer.getEntry(haEventWrapper);
         if (entry != null) {
-          HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper,
-              haContainer, owner.getName());
+          original = (HAEventWrapper)entry.getKey();
+          original.incAndGetReferenceCount();
+        }
+        else {
+          haEventWrapper.incAndGetReferenceCount();
+          haEventWrapper.setHAContainer(haContainer);
+          haContainer.put(haEventWrapper, haEventWrapper
+              .getClientUpdateMessage());
           haEventWrapper.setClientUpdateMessage(null);
-          newValue = CachedDeserializableFactory.create(original,
-              ((CachedDeserializable)newValue).getSizeInBytes());
+          haEventWrapper.setIsRefFromHAContainer(true);
         }
       }
+      if (entry != null) {
+        HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper,
+            haContainer, owner.getName());
+        haEventWrapper.setClientUpdateMessage(null);
+        newValue = CachedDeserializableFactory.create(original,
+            ((CachedDeserializable)newValue).getSizeInBytes());
+      }
+*/
+      }
     }
     
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index abe38b6..f1627d3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -381,6 +381,7 @@ implements Bucket
           if (isDebugEnabled) {
             logger.debug("LockKeys: found key: {}:{}", keys[i], foundLock.lockedTimeStamp);
           }
+          foundLock.waiting();
           break;
         }
       }
@@ -421,7 +422,9 @@ implements Bucket
               long waitTime = System.currentTimeMillis()-lockValue.lockedTimeStamp;
               logger.trace("LockKeys: remove key {}, notifyAll for {}. It waited", keys[i], lockValue, waitTime);
             }
-            lockValue.notifyAll();
+            if (lockValue.isSomeoneWaiting()) {
+              lockValue.notifyAll();
+            }
           }
         }
       } // for

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index f51717d..4ca60f9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -352,8 +352,8 @@ public abstract class DistributedCacheOperation {
 
       if (recipients.isEmpty() && adjunctRecipients.isEmpty()
           && needsOldValueInCacheOp.isEmpty() && cachelessNodes.isEmpty()) {
-        if (mgr.getNormalDistributionManagerIds().size() > 1) {
-          if (region.isInternalRegion()) {
+        if (region.isInternalRegion()) {
+          if (mgr.getNormalDistributionManagerIds().size() > 1) {
             // suppress this msg if we are the only member.
             if (logger.isTraceEnabled()) {
               logger.trace("<No Recipients> {}", this);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterRoutingInfo.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterRoutingInfo.java
index 7f5b587..46fd295 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterRoutingInfo.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterRoutingInfo.java
@@ -362,24 +362,29 @@ public class FilterRoutingInfo implements VersionedDataSerializable {
     }
     
     public void toData(DataOutput out) throws IOException {
-        HeapDataOutputStream hdos = new HeapDataOutputStream(1000, null);
-        if (this.cqs == null) {
-          hdos.writeBoolean(false);
-        } else {
-          hdos.writeBoolean(true);
-          InternalDataSerializer.writeArrayLength(cqs.size(), hdos);
-          for (Iterator it=this.cqs.entrySet().iterator(); it.hasNext(); ) {
-            Map.Entry e = (Map.Entry)it.next();
-            // most cq IDs and all event types are small ints, so we use an optimized
-            // write that serializes 7 bits at a time in a compact form
-            InternalDataSerializer.writeUnsignedVL(((Long)e.getKey()).longValue(), hdos);
-            InternalDataSerializer.writeUnsignedVL(((Integer)e.getValue()).intValue(), hdos);
-          }
+      HeapDataOutputStream hdos;
+      int size = 9;
+      size += interestedClients == null ? 4 : interestedClients.size() * 8 + 5;
+      size += interestedClientsInv == null ? 4 : interestedClientsInv.size() * 8 + 5;
+      size += cqs == null ? 0 : cqs.size() * 12;
+      hdos = new HeapDataOutputStream(size, null);
+      if (this.cqs == null) {
+        hdos.writeBoolean(false);
+      } else {
+        hdos.writeBoolean(true);
+        InternalDataSerializer.writeArrayLength(cqs.size(), hdos);
+        for (Iterator it = this.cqs.entrySet().iterator(); it.hasNext();) {
+          Map.Entry e = (Map.Entry) it.next();
+          // most cq IDs and all event types are small ints, so we use an optimized
+          // write that serializes 7 bits at a time in a compact form
+          InternalDataSerializer.writeUnsignedVL(((Long) e.getKey()).longValue(), hdos);
+          InternalDataSerializer.writeUnsignedVL(((Integer) e.getValue()).intValue(), hdos);
         }
-        InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
-        InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
-        byte[] myData = hdos.toByteArray();
-        DataSerializer.writeByteArray(myData, out);
+      }
+      InternalDataSerializer.writeSetOfLongs(this.interestedClients, this.longIDs, hdos);
+      InternalDataSerializer.writeSetOfLongs(this.interestedClientsInv, this.longIDs, hdos);
+      byte[] myData = hdos.toByteArray();
+      DataSerializer.writeByteArray(myData, out);
     }
     
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerMap.java
index 084140f..8f8be30 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerMap.java
@@ -37,7 +37,7 @@ public class HAContainerMap implements HAContainerWrapper {
    * TODO: Amogh: Using ConcurrentHashMap may be beneficial. It gives us
    * putEntryIfAbsent()!
    */
-  private Map map = null;
+  private ConcurrentHashMap map = null;
 
   /**
    * This map helps us retrieve the proxy id at the receiver side during GII so
@@ -46,7 +46,7 @@ public class HAContainerMap implements HAContainerWrapper {
    */
   private final Map<String, CacheClientProxy> haRegionNameToProxy;
 
-  public HAContainerMap(HashMap containerMap) {
+  public HAContainerMap(ConcurrentHashMap containerMap) {
     map = containerMap;
     haRegionNameToProxy = new ConcurrentHashMap<String, CacheClientProxy>();
   }
@@ -78,12 +78,10 @@ public class HAContainerMap implements HAContainerWrapper {
    * @param key
    * @return Object
    */
-  public Object getKey(Object key) {
-    synchronized (map) {
-      Entry entry = (Entry)map.get(key);
-      return (entry == null) ? null : entry.getKey();
-    }
-  }
+	public Object getKey(Object key) {
+		Entry entry = (Entry) map.get(key);
+		return (entry == null) ? null : entry.getKey();
+	}
 
   public String getName() {
     return "HashMap";
@@ -95,15 +93,11 @@ public class HAContainerMap implements HAContainerWrapper {
   }
 
   public void clear() {
-    synchronized (map) {
-      map.clear();
-    }
+    map.clear();
   }
 
   public boolean containsKey(Object key) {
-    synchronized (map) {
-      return map.containsKey(key);
-    }
+    return map.containsKey(key);
   }
 
   public boolean containsValue(Object value) {
@@ -117,55 +111,44 @@ public class HAContainerMap implements HAContainerWrapper {
   }
 
   public Object get(Object key) {
-    synchronized (map) {
-      Entry entry = (Entry)map.get(key);
-      return (entry == null) ? null : entry.getValue();
-    }
+    Entry entry = (Entry)map.get(key);
+    return (entry == null) ? null : entry.getValue();
   }
 
   public Object getEntry(Object key) {
-    synchronized (map) {
-      return map.get(key);
-    }
+    return map.get(key);
   }
 
   public boolean isEmpty() {
-    synchronized (map) {
-      return map.isEmpty();
-    }
+    return map.isEmpty();
   }
 
   public Set keySet() {
-    synchronized (map) {
-      return map.keySet();
-    }
+    return map.keySet();
   }
 
   public Object put(Object key, Object value) {
-    Entry entry = new Entry(key, value);
-    synchronized (map) {
-      return map.put(key, entry);
-    }
+    Entry old = (Entry) map.put(key, new Entry(key, value));
+    return old != null ? old.getValue() : null;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Object putIfAbsent(Object key, Object value) {
+    Entry old = (Entry) map.putIfAbsent(key, new Entry(key, value));
+    return old != null ? old.getValue() : null;
   }
 
   public void putAll(Map t) {
-    //synchronized (map) {
-    //  map.putAll(t);
-    //}
     throw new UnsupportedOperationException("putAll() not supported.");
   }
 
   public Object remove(Object key) {
-    synchronized (map) {
-      Entry entry = (Entry)map.remove(key);
-      return (entry == null) ? null : entry.getValue();
-    }
+    Entry entry = (Entry)map.remove(key);
+    return (entry == null) ? null : entry.getValue();
   }
 
   public int size() {
-    synchronized (map) {
-      return map.size();
-    }
+    return map.size();
   }
 
   public Collection values() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
index eeefaee..db1a783 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
@@ -37,7 +37,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
  */
 public class HAContainerRegion implements HAContainerWrapper {
 
-  private Map map;
+  private Region map;
   
   private final Map<String, CacheClientProxy> haRegionNameToProxy;
   
@@ -83,9 +83,7 @@ public class HAContainerRegion implements HAContainerWrapper {
         return null;
       }
     }
-    else {
-      return null;
-    }
+    return null;
   }
 
   public String getName() {
@@ -121,23 +119,23 @@ public class HAContainerRegion implements HAContainerWrapper {
   }
 
   public Object get(Object key) {
-    ClientUpdateMessageImpl cum = (ClientUpdateMessageImpl)map.get(key);
-    if (cum != null) {
-      cum.setEventIdentifier(((HAEventWrapper)key).getEventId());
-      if (cum.hasCqs()) {
-        cum.setClientCqs(((HAEventWrapper)key).getClientCqs());
+    ClientUpdateMessageImpl msg = (ClientUpdateMessageImpl)map.get(key);
+    if (msg != null) {
+      msg.setEventIdentifier(((HAEventWrapper)key).getEventId());
+      if (msg.hasCqs()) {
+        msg.setClientCqs(((HAEventWrapper)key).getClientCqs());
       }
     }
-    return cum;
+    return msg;
   }
   
   public Object getEntry(Object key) {
     Region.Entry entry = ((Region)map).getEntry(key);
     if(entry != null) {
-      ClientUpdateMessageImpl cum = (ClientUpdateMessageImpl)entry.getValue();
-      cum.setEventIdentifier(((HAEventWrapper)key).getEventId());
-      if(cum.hasCqs()) {
-        cum.setClientCqs(((HAEventWrapper)key).getClientCqs());
+      ClientUpdateMessageImpl msg = (ClientUpdateMessageImpl)entry.getValue();
+      msg.setEventIdentifier(((HAEventWrapper)key).getEventId());
+      if(msg.hasCqs()) {
+        msg.setClientCqs(((HAEventWrapper)key).getClientCqs());
       }      
     }
     return entry;
@@ -155,6 +153,11 @@ public class HAContainerRegion implements HAContainerWrapper {
     return map.put(key, value);
   }
 
+  @SuppressWarnings("unchecked")
+  public Object putIfAbsent(Object key, Object value) {
+    return map.putIfAbsent(key, value);
+  }
+  
   public void putAll(Map t) {
     map.putAll(t);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerWrapper.java
index b0f3e45..d0b831e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerWrapper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerWrapper.java
@@ -42,5 +42,6 @@ public interface HAContainerWrapper extends Map {
   public Object removeProxy(String haRegionName);
   
   public CacheClientProxy getProxy(String haRegionName);
-  
+ 
+  public Object putIfAbsent(Object key, Object value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
old mode 100644
new mode 100755
index a9d5e6b..f8f4500
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java
@@ -239,9 +239,9 @@ public class HARegionQueue implements RegionQueue
   protected static volatile int messageSyncInterval = DEFAULT_MESSAGE_SYNC_INTERVAL;
   
   /**
-   * The underlying map (may hold reference to a Region or a HashMap) for this
-   * HARegionQueue instance (and also shared by all the HARegionQueue instances
-   * associated with the same CacheClientNotifier).
+   * The underlying map (may hold reference to a Region or a ConcurrentHashMap)
+   * for this HARegionQueue instance (and also shared by all the HARegionQueue
+   * instances associated with the same CacheClientNotifier).
    */
   protected Map haContainer;
 
@@ -1611,11 +1611,10 @@ public class HARegionQueue implements RegionQueue
    */
   private List getBatchAndUpdateThreadContext(int batchSize)      
   {
-    // TODO : Dinesh : verify whether (batchSize * 2) is needed or not
-    List batch = new ArrayList(batchSize * 2);
     Iterator itr = this.idsAvailable.iterator();
     int currSize = this.idsAvailable.size();
     int limit = currSize >= batchSize ? batchSize : currSize;
+    List batch = new ArrayList(limit);
 
     List peekedEventsThreadContext;
     if ((peekedEventsThreadContext = (List)HARegionQueue.peekedEventsContext.get()) == null) {
@@ -1921,12 +1920,13 @@ public class HARegionQueue implements RegionQueue
       try {
         if (event instanceof HAEventWrapper) {
           event = (Conflatable)this.haContainer.get(event);
+          
           if (event instanceof ClientUpdateMessage) {
             if (((ClientUpdateMessage) event).hasCqs() && ((ClientUpdateMessage) event).hasCqs(clientProxyID)) {
-              CqNameToOp cqNames = ((ClientUpdateMessage)event).getClientCq(clientProxyID);
+              CqNameToOp cqNames = ((ClientUpdateMessage) event).getClientCq(clientProxyID);
               if (cqNames != null) {
-                for (String cqName: cqNames.getNames()) {
-                  InternalCqQuery cq = ((InternalCqQuery)cqService.getClientCqFromServer(clientProxyID, cqName));
+                for (String cqName : cqNames.getNames()) {
+                  InternalCqQuery cq = ((InternalCqQuery) cqService.getClientCqFromServer(clientProxyID, cqName));
                   CqQueryVsdStats cqStats = cq.getVsdStats();
                   if (cq != null && cqStats != null) {
                     cqStats.incNumHAQueuedEvents(incrementAmount);
@@ -1936,10 +1936,9 @@ public class HARegionQueue implements RegionQueue
             }
           }
         }
-      }
-      catch (Exception e) {
-       //catch exceptions that arise due to maintaining cq stats
-        //as maintaining cq stats should not affect the system.
+      } catch (Exception e) {
+        // catch exceptions that arise due to maintaining cq stats
+        // as maintaining cq stats should not affect the system.
         if (logger.isTraceEnabled()) {
           logger.trace("Exception while maintaining cq events stats.", e);
         }
@@ -2104,7 +2103,7 @@ public class HARegionQueue implements RegionQueue
   {
     Map container = null;
     if (haRgnQType == HARegionQueue.BLOCKING_HA_QUEUE) {
-      container = new HAContainerMap(new HashMap());
+      container = new HAContainerMap(new ConcurrentHashMap());
     }
     else {
       // Should actually be HAContainerRegion, but ok if only JUnits using this
@@ -2201,7 +2200,7 @@ public class HARegionQueue implements RegionQueue
   {
     Map container = null;
     if (haRgnQType == HARegionQueue.BLOCKING_HA_QUEUE) {
-      container = new HAContainerMap(new HashMap());
+      container = new HAContainerMap(new ConcurrentHashMap());
     }
     else {
       // Should actually be HAContainerRegion, but ok if only JUnits using this
@@ -3726,9 +3725,41 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
           // authentic, i.e. it doesn't refer to the HAEventWrapper instance
           // in the haContainer, but to the one outside it.
           boolean entryFound;
-          Map.Entry entry = null;
-          synchronized (this.haContainer) {
-            entry = (Map.Entry)((HAContainerWrapper)this.haContainer)
+          //synchronized (this.haContainer) {
+          HAEventWrapper original = null;
+          do {
+            ClientUpdateMessageImpl old = (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer).putIfAbsent(haEventWrapper,
+                haEventWrapper.getClientUpdateMessage());
+            if (old != null) {
+              original = (HAEventWrapper) ((HAContainerWrapper) this.haContainer).getKey(haEventWrapper);
+              if (original == null) {
+                continue;
+              }
+              synchronized (original) {
+                // assert the entry is still present
+                if (((HAContainerWrapper) this.haContainer).getKey(original) != null) {
+                  original.incAndGetReferenceCount();
+                  addClientCQsAndInterestList(old, haEventWrapper, this.haContainer, this.regionName);
+                  haEventWrapper = original;
+                } else {
+                  original = null;
+                }
+              }
+            } else {
+              synchronized (haEventWrapper) {
+                haEventWrapper.incAndGetReferenceCount();
+                haEventWrapper.setHAContainer(this.haContainer);
+                if (!haEventWrapper.getPutInProgress()) {
+                  // This means that this is a GII'ed event. Hence we must
+                  // explicitly set 'clientUpdateMessage' to null.
+                  haEventWrapper.setClientUpdateMessage(null);
+                }
+                haEventWrapper.setIsRefFromHAContainer(true);
+              }
+              break;
+            }
+          } while (original == null);
+          /*  entry = (Map.Entry)((HAContainerWrapper)this.haContainer)
                 .getEntry(haEventWrapper);
             if (entry == null) {
               entryFound = false;
@@ -3739,7 +3770,7 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
               // Do not assign entry.getKey() to haEventWrapper right now.
               ((HAEventWrapper)entry.getKey()).incAndGetReferenceCount();
             }
-          }
+          }//haContainer synchronized ends
           if (entryFound) {
             addClientCQsAndInterestList(entry, haEventWrapper, haContainer,
                 regionName);
@@ -3752,7 +3783,7 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
               haEventWrapper.setClientUpdateMessage(null);
             }
             haEventWrapper.setIsRefFromHAContainer(true);
-          }
+          }*/
         }
       }
       // This has now been taken care of in AbstractRegionMap.initialImagePut()
@@ -3772,16 +3803,13 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
     }
   }
   
-  public static void addClientCQsAndInterestList(Map.Entry entry,
-      HAEventWrapper haEventWrapper, Map haContainer, String regionName) {
+  public static void addClientCQsAndInterestList(ClientUpdateMessageImpl msg, HAEventWrapper haEventWrapper, Map haContainer, String regionName) {
 
-    ClientProxyMembershipID proxyID = ((HAContainerWrapper)haContainer)
-        .getProxyID(regionName);
+    ClientProxyMembershipID proxyID = ((HAContainerWrapper) haContainer).getProxyID(regionName);
     if (haEventWrapper.getClientCqs() != null) {
       CqNameToOp clientCQ = haEventWrapper.getClientCqs().get(proxyID);
       if (clientCQ != null) {
-        ((ClientUpdateMessageImpl)entry.getValue()).addClientCqs(proxyID,
-            clientCQ);
+        msg.addClientCqs(proxyID, clientCQ);
       }
     }
     // if (haEventWrapper.getPutInProgress()) {
@@ -3790,15 +3818,11 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
 
     // This is a remote HAEventWrapper.
     // Add new Interested client lists.
-    ClientUpdateMessageImpl clientMsg = (ClientUpdateMessageImpl)haEventWrapper
-        .getClientUpdateMessage();
+    ClientUpdateMessageImpl clientMsg = (ClientUpdateMessageImpl) haEventWrapper.getClientUpdateMessage();
     if (clientMsg.isClientInterestedInUpdates(proxyID)) {
-      ((ClientUpdateMessageImpl)entry.getValue()).addClientInterestList(
-          proxyID, true);
-    }
-    else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
-      ((ClientUpdateMessageImpl)entry.getValue()).addClientInterestList(
-          proxyID, false);
+      msg.addClientInterestList(proxyID, true);
+    } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
+      msg.addClientInterestList(proxyID, false);
     }
   }
   
@@ -3949,18 +3973,18 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
    */
   public Conflatable getAndRemoveFromHAContainer(Conflatable conflatable) 
   {
-    Conflatable cum = null;
+    Conflatable msg = null;
     if (conflatable instanceof HAEventWrapper) {
       HAEventWrapper wrapper = (HAEventWrapper)conflatable;
-      cum = (Conflatable)HARegionQueue.this.haContainer.get(wrapper);
-      if (cum != null) {
+      msg = (Conflatable)HARegionQueue.this.haContainer.get(wrapper);
+      if (msg != null) {
         decAndRemoveFromHAContainer(wrapper);
       }
     }
     else {
-      cum = conflatable;
+      msg = conflatable;
     }
-    return cum;
+    return msg;
   }
   
   /**
@@ -3977,7 +4001,7 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid,
   {
     if (wrapper.decAndGetReferenceCount() == 0L
         && !wrapper.getPutInProgress()) {
-      synchronized (this.haContainer) {
+      synchronized (wrapper) {
         if (wrapper.getReferenceCount() == 0L) {
           if (logger.isDebugEnabled()) {
             logger.debug("Removing event from {}: {}", this.region.getFullPath(), wrapper.getEventId());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/LockObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/LockObject.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/LockObject.java
index 612b71a..78107f2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/LockObject.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/LockObject.java
@@ -20,23 +20,33 @@ public class LockObject {
   public Object key;
   public long lockedTimeStamp;
   private boolean removed;
-    
+  private boolean waiting = false;
+
   public LockObject(Object key, long lockedTimeStamp) {
     this.key = key;
     this.lockedTimeStamp = lockedTimeStamp;
   }
-  
-  /**Always updated when the monitor is held on this object */
+
+  public void waiting() {
+    waiting = true;
+  }
+
+  public boolean isSomeoneWaiting() {
+    return waiting;
+  }
+
+  /** Always updated when the monitor is held on this object */
   public void setRemoved() {
     this.removed = true;
   }
 
-  /**Always checked when the monitor is held on this object */
+  /** Always checked when the monitor is held on this object */
   public boolean isRemoved() {
     return this.removed;
   }
-  
+
+  @Override
   public String toString() {
-    return "{LockObject="+key+"("+lockedTimeStamp+")}";
+    return "LockObject [key=" + key + ", lockedTimeStamp=" + lockedTimeStamp + ", removed=" + removed + ", waiting=" + waiting + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index d351569..edf3a94 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -1506,7 +1506,7 @@ public class CacheClientNotifier {
       if (!wrapper.getIsRefFromHAContainer()) {
         wrapper = (HAEventWrapper)haContainer.getKey(wrapper);
         if (wrapper != null && !wrapper.getPutInProgress()) {
-          synchronized (haContainer) {
+          synchronized (wrapper) {
             if (wrapper.getReferenceCount() == 0L) {
               if (logger.isDebugEnabled()) {
                 logger.debug("Removing event from haContainer: {}", wrapper);
@@ -1523,7 +1523,7 @@ public class CacheClientNotifier {
         // This wrapper resides in haContainer.
         wrapper.setClientUpdateMessage(null);
         wrapper.setPutInProgress(false);
-        synchronized (haContainer) {
+        synchronized (wrapper) {
           if (wrapper.getReferenceCount() == 0L) {
             if (logger.isDebugEnabled()) {
               logger.debug("Removing event from haContainer: {}", wrapper);
@@ -2699,7 +2699,7 @@ public class CacheClientNotifier {
               (Boolean)overflowAttributesList.get(4))));
     }
     else {
-      haContainer = new HAContainerMap(new HashMap());
+      haContainer = new HAContainerMap(new ConcurrentHashMap());
     }
     assert haContainer != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 0fb915e..cde0be3 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -58,6 +59,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.offheap.MemoryAllocatorImpl;
 
+
 /**
  * Class <code>ClientUpdateMessageImpl</code> is a message representing a cache
  * operation that is sent from a server to an interested client.
@@ -442,7 +444,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         // Notify all - do not send the value
         message = new Message(6, clientVersion);
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
         // Currently serializing the key here instead of when the message
         // is put in the queue so that it can be conflated it later
         message.addStringOrObjPart(this._keyOfInterest);
@@ -461,7 +463,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         else {
           message.setMessageType(MessageType.LOCAL_UPDATE);
         }
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
         // Currently serializing the key here instead of when the message
         // is put in the queue so that it can be conflated it later
         message.addStringOrObjPart(this._keyOfInterest);
@@ -483,7 +485,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
       else {
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
       }
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
 
       // Currently serializing the key here instead of when the message
       // is put in the queue so that it can be conflated it later
@@ -498,7 +500,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     } else if (isDestroyRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
       
@@ -509,7 +511,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isClearRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.CLEAR_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
       
@@ -520,7 +522,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isInvalidateRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.INVALIDATE_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
       
@@ -560,7 +562,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
 
         // Add the region name
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
 
         // Add the key
         // Currently serializing the key here instead of when the message
@@ -587,7 +589,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
           message.setMessageType(MessageType.LOCAL_CREATE);
 
           // Add the region name
-          message.addStringPart(this._regionName);
+          message.addStringPart(this._regionName, true);
 
           // Add the key
           // Currently serializing the key here instead of when the message
@@ -602,7 +604,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
           message.setMessageType(MessageType.LOCAL_UPDATE);
 
           // Add the region name
-          message.addStringPart(this._regionName);
+          message.addStringPart(this._regionName, true);
 
           // Add the key
           // Currently serializing the key here instead of when the message
@@ -657,7 +659,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
       }
 
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
 
       // Currently serializing the key here instead of when the message
       // is put in the queue so that it can be conflated later
@@ -674,7 +676,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isDestroyRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -685,7 +687,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isClearRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.CLEAR_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -696,7 +698,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isInvalidateRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.INVALIDATE_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -741,7 +743,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
 
         // Add the region name
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
 
         // Add the key
         // Currently serializing the key here instead of when the message
@@ -757,7 +759,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
           message.setMessageType(MessageType.LOCAL_CREATE);
 
           // Add the region name
-          message.addStringPart(this._regionName);
+          message.addStringPart(this._regionName, true);
 
           // Add the key
           // Currently serializing the key here instead of when the message
@@ -772,7 +774,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
           message.setMessageType(MessageType.LOCAL_UPDATE);
 
           // Add the region name
-          message.addStringPart(this._regionName);
+          message.addStringPart(this._regionName, true);
 
           // Add the key
           // Currently serializing the key here instead of when the message
@@ -826,7 +828,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         message = new Message(6 + cqMsgParts, clientVersion);
         message.setMessageType(MessageType.LOCAL_INVALIDATE);
       }
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addStringOrObjPart(this._keyOfInterest);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
@@ -844,7 +846,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isDestroyRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -855,7 +857,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isClearRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.CLEAR_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
       message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -866,7 +868,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     else if (isInvalidateRegion()) {
       message = new Message(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.INVALIDATE_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
 
       // Add CQ status.
@@ -893,7 +895,6 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
       byte[] latestValue = p_latestValue;
       Message message = null;
       ClientProxyMembershipID proxyId = proxy.getProxyID();
-
       // Add CQ info.
       int cqMsgParts = 0;
       boolean clientHasCq = this._hasCqs && (this.getCqs(proxyId) != null);
@@ -910,17 +911,17 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
             cqMsgParts++; // To store base operation type for CQ.
           }
 
-          message = new Message(7 + cqMsgParts, clientVersion);
+          message = getMessage(7 + cqMsgParts, clientVersion);
           message.setMessageType(MessageType.LOCAL_INVALIDATE);
-          message.addStringPart(this._regionName);
+          message.addStringPart(this._regionName, true);
           message.addStringOrObjPart(this._keyOfInterest);
         }
         else {
           // Notify by subscription - send the value
-          message = new Message(9 + cqMsgParts, clientVersion);
+          message = getMessage(9 + cqMsgParts, clientVersion);
           if (isCreate()) {
             message.setMessageType(MessageType.LOCAL_CREATE);
-            message.addStringPart(this._regionName);
+            message.addStringPart(this._regionName, true);
             message.addStringOrObjPart(this._keyOfInterest);
             message.addObjPart(Boolean.FALSE); // NO delta
             // Add the value (which has already been serialized)
@@ -928,7 +929,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
           }
           else {
             message.setMessageType(MessageType.LOCAL_UPDATE);
-            message.addStringPart(this._regionName);
+            message.addStringPart(this._regionName, true);
             message.addStringOrObjPart(this._keyOfInterest);
 
             if (this.deltaBytes != null
@@ -972,17 +973,17 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
       }
       else if (isDestroy() || isInvalidate()) {
         if (isDestroy()) {
-          message = new Message(7 + cqMsgParts, clientVersion);
+          message = getMessage(7 + cqMsgParts, clientVersion);
           message.setMessageType(MessageType.LOCAL_DESTROY);
         }
         else {
           if (clientHasCq){
             cqMsgParts++;/* To store the region operation for CQ */
           }
-          message = new Message(7 + cqMsgParts, clientVersion);
+          message = getMessage(7 + cqMsgParts, clientVersion);
           message.setMessageType(MessageType.LOCAL_INVALIDATE);
         }
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
         message.addStringOrObjPart(this._keyOfInterest);
         message.addObjPart(this._callbackArgument);
         message.addObjPart(this.versionTag);
@@ -999,9 +1000,9 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
       }
     }
       else if (isDestroyRegion()) {
-        message = new Message(4 + cqMsgParts, clientVersion);
+        message = getMessage(4 + cqMsgParts, clientVersion);
         message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
         message.addObjPart(this._callbackArgument);
         message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -1010,9 +1011,9 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         }
       }
       else if (isClearRegion()) {
-        message = new Message(4 + cqMsgParts, clientVersion);
+        message = getMessage(4 + cqMsgParts, clientVersion);
         message.setMessageType(MessageType.CLEAR_REGION);
-        message.addStringPart(this._regionName);
+        message.addStringPart(this._regionName, true);
         message.addObjPart(this._callbackArgument);
         message.addObjPart(Boolean.valueOf(clientHasCq));
 
@@ -1021,9 +1022,9 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         }
       }
     else if (isInvalidateRegion()) {
-      message = new Message(4 + cqMsgParts, clientVersion);
+      message = getMessage(4 + cqMsgParts, clientVersion);
       message.setMessageType(MessageType.INVALIDATE_REGION);
-      message.addStringPart(this._regionName);
+      message.addStringPart(this._regionName, true);
       message.addObjPart(this._callbackArgument);
 
       // Add CQ status.
@@ -1044,6 +1045,22 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     return message;
   }
 
+  private static final ThreadLocal<Map<Integer,Message>> CACHED_MESSAGES = new ThreadLocal<Map<Integer,Message>>() {
+    protected Map<Integer,Message> initialValue() {
+      return new HashMap<Integer,Message>();
+    };
+  };
+
+  private Message getMessage(int numParts, Version clientVersion) {
+    Message m = CACHED_MESSAGES.get().get(numParts);
+    if (m == null) {
+      m =  new Message(numParts, Version.CURRENT);
+      CACHED_MESSAGES.get().put(numParts, m);
+    }
+    m.clearParts();
+    m.setVersion(clientVersion);
+    return m;
+  }
 
   /**
    * @return boolean true if the event is due to net load.
@@ -1088,6 +1105,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     String[] cqNames = null;  
     if (this._clientCqs != null) {
       CqNameToOp cqs = this._clientCqs.get(clientId);
+      
       if (cqs != null && !cqs.isEmpty()) {
         cqNames = cqs.getNames();
       }
@@ -1567,12 +1585,23 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
   public static class CqNameToOpSingleEntry implements CqNameToOp {
     private String name;
     private int op;
+  
+    private static final String[] EMPTY_NAMES_ARRAY = new String[0];
+    
+    private static Map<String,String[]> NAMES_ARRAY = new ConcurrentHashMap<String,String[]>();
     
     public CqNameToOpSingleEntry(String name, Integer op) {
-      this.name = name;
+      initializeName(name);
       this.op = op.intValue();
     }
-
+    
+    private void initializeName(String name) {
+      this.name = name;
+      if (!NAMES_ARRAY.containsKey(name)) {
+        NAMES_ARRAY.put(name, new String[]{name});
+      }
+    }
+    
     @Override
     public void sendTo(DataOutput out) throws IOException {
       // When serialized it needs to look just as if writeObject was called on a HASH_MAP
@@ -1593,7 +1622,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     @Override
     public void addToMessage(Message message) {
       if (!isEmpty()) {
-        message.addStringPart(this.name);
+        message.addStringPart(this.name, true);
         message.addIntPart(this.op);
       }
     }
@@ -1605,11 +1634,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
 
     @Override
     public String[] getNames() {
-      if (isEmpty()) {
-        return new String[0];
-      } else {
-        return new String[]{this.name};
-      }
+      return (isEmpty()) ? EMPTY_NAMES_ARRAY : NAMES_ARRAY.get(this.name);
     }
 
     @Override
@@ -1667,7 +1692,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
         Entry<String, Integer> entry = entries.next();
         // Add CQ Name.
         String cq = entry.getKey();
-        message.addStringPart(cq);
+        message.addStringPart(cq, true);
         // Add CQ Op.
         int op = entry.getValue().intValue();
         message.addIntPart(op);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index 459cf5f..13e6d22 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@ -39,6 +39,8 @@ import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -94,6 +96,27 @@ public class Message  {
   private static final int FIXED_LENGTH = 17;
 
   private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
+  
+  private static final byte[] TRUE;
+  private static final byte[] FALSE;
+
+  static {
+    try {
+      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
+      BlobHelper.serializeTo(Boolean.TRUE, hdos);
+      TRUE = hdos.toByteArray();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+
+    try {
+      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
+      BlobHelper.serializeTo(Boolean.FALSE, hdos);
+      FALSE = hdos.toByteArray();
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+  }
 
   protected int msgType;
   protected int payloadLength=0;
@@ -242,18 +265,34 @@ public class Message  {
   }
 
   public void addStringPart(String str) {
+    addStringPart(str, false);
+  }
+  
+  private static final Map<String,byte[]> CACHED_STRINGS = new ConcurrentHashMap<String,byte[]>();
+  
+  public void addStringPart(String str, boolean enableCaching) {
     if (str==null) {
       addRawPart((byte[])null, false);
     }
     else {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(str);
-      this.messageModified = true;
       Part part = partsList[this.currentPart];
-      part.setPartState(hdos, false);
+      if (enableCaching) {
+        byte[] bytes = CACHED_STRINGS.get(str);
+        if (bytes == null) {
+          HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+          bytes = hdos.toByteArray();
+          CACHED_STRINGS.put(str, bytes);
+        }
+        part.setPartState(bytes, false);
+      } else {
+        HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+        this.messageModified = true;
+        part.setPartState(hdos, false);
+      }
       this.currentPart++;
     }
   }
-
+  
   /*
    * Adds a new part to this message that contains a <code>byte</code>
    * array (as opposed to a serialized object).
@@ -295,9 +334,12 @@ public class Message  {
       serializeAndAddPartNoCopying(o);
     }
   }
+
   public void addObjPart(Object o, boolean zipValues) {
     if (o == null || o instanceof byte[]) {
-      addRawPart((byte[])o, false);
+      addRawPart((byte[]) o, false);
+    } else if (o instanceof Boolean) {
+      addRawPart((Boolean) o ? TRUE : FALSE, true);
     } else {
       serializeAndAddPart(o, zipValues);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
index 1c3819e..42b7662 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
@@ -23,6 +23,8 @@ import com.gemstone.gemfire.internal.offheap.StoredObject;
 import java.io.*;
 import java.nio.*;
 import java.nio.channels.*;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Represents one unit of information (essentially a <code>byte</code>
@@ -185,9 +187,16 @@ public class Part {
         | ((bytes[offset + 3]) & 0x000000FF);
   }
 
+  //TODO Check non-enum callers. Don't want to cache all ints, just known ones.
+  private static final Map<Integer,byte[]> CACHED_INTS = new ConcurrentHashMap<Integer,byte[]>();
+  
   public void setInt(int v) {
-    byte[] bytes = new byte[4];
-    encodeInt(v, bytes);
+    byte[] bytes = CACHED_INTS.get(v);
+    if (bytes == null) {
+      bytes = new byte[4];
+      encodeInt(v, bytes);
+      CACHED_INTS.put(v, bytes);
+    }
     this.typeCode = BYTE_CODE;
     this.part = bytes;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
----------------------------------------------------------------------
diff --git a/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
index 63a1321..7eee1aa 100644
--- a/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
@@ -1011,8 +1011,8 @@ fromData,33,2a2bb9001b0100b500072a2bb8001cc0001db500052a2bb8001cc0001eb50003b1
 toData,27,2b2ab40007b9001902002ab400052bb8001a2ab400032bb8001ab1
 
 com/gemstone/gemfire/internal/cache/FilterProfile,2
-fromData,210,bb012d59b7012e4d2c2bb8012f2a2cb500202ab4000d2bb80130b900300200572ab400052bb80131b9007602002ab400072bb80131b9007602002ab400092bb80131b9007602002ab4000e2bb80130b900300200572ab400062bb80131b9007602002ab400082bb80131b9007602002ab4000a2bb80131b9007602002bb801323e1d9e004f05b80134360403360515051da2002c2bb801353a062bb801363a072a1906190703b601372ab4000f19061907b90054030057840501a7ffd41504b8013457a7000e3a081504b80134571908bfb1
-toData,181,2ab40020c0012d2bb801392ab4000d2ab40023b401142bb8013a2ab400052bb8013b2ab400072bb8013b2ab400092bb8013b2ab4000e2ab40023b401142bb8013a2ab400062bb8013b2ab400082bb8013b2ab4000a2bb8013b2ab4000f4d2cb900b501003e1d2bb8013c2cb900ac0100b900ad01003a041904b9006f01009900361904b900700100c000ae3a051905b901170100c0003c3a061905b900af0100c000823a0719062bb8013d19072bb80139a7ffc6b1
+fromData,210,bb013159b701324d2c2bb801332a2cb500202ab4000d2bb80134b900300200572ab400052bb80135b9007802002ab400072bb80135b9007802002ab400092bb80135b9007802002ab4000e2bb80134b900300200572ab400062bb80135b9007802002ab400082bb80135b9007802002ab4000a2bb80135b9007802002bb801363e1d9e004f05b80138360403360515051da2002c2bb801393a062bb8013a3a072a1906190703b6013b2ab4000f19061907b90054030057840501a7ffd41504b8013857a7000e3a081504b80138571908bfb1
+toData,181,2ab40020c001312bb8013d2ab4000d2ab40023b401182bb8013e2ab400052bb8013f2ab400072bb8013f2ab400092bb8013f2ab4000e2ab40023b401182bb8013e2ab400062bb8013f2ab400082bb8013f2ab4000a2bb8013f2ab4000f4d2cb900b901003e1d2bb801402cb900ae0100b900af01003a041904b900b001009900361904b900b10100c000b23a051905b9011b0100c0003c3a061905b900b30100c000843a0719062bb8014119072bb8013da7ffc6b1
 
 com/gemstone/gemfire/internal/cache/FilterProfile$OperationMessage,2
 fromData,129,2a2bb700512a2bb900520100b500092a2bb900530100b500402ab800542bb90055010032b500222a2bb900560100b500282a2bb900570100b500492ab40022b8004b99002c2a2bb900530100b500312ab40022b2004da5000d2ab40022b2004ea600202a2bb80058b50032a700152a2bb900570100b500242a2bb80059b50026b1
@@ -1026,9 +1026,9 @@ toDataPre_GFE_7_1_0_0,88,2ab40006b600113d2b1cb9002402002ab40006b60016b9001701004
 
 com/gemstone/gemfire/internal/cache/FilterRoutingInfo$FilterInfo,4
 fromData,9,2a2bb80014b50015b1
-fromDataPre_GFE_8_0_0_0,50,b800249900162a2bb80025b500262a2bb80014b50015a7001b2a2bb80027b500012a2bb80028b5000e2a2bb80028b50012b1
-toData,149,bb0016591103e801b700174d2ab40001c7000b2c03b60018a7005a2c04b600182ab40001b600192cb8001a2ab40001b60006b9000701004e2db9000801009900342db900090100c0000a3a041904b9000b0100c0001bb6001c2cb8001d1904b9000c0100c0001eb6001f852cb8001da7ffc92ab4000e2ab400202cb800212ab400122ab400202cb800212cb600224e2d2bb80023b1
-toDataPre_GFE_8_0_0_0,193,b8002499009dbb0016591103e82bb80029b700174d2ab40001c7000b2c03b60018a7005a2c04b600182ab40001b600192cb8001a2ab40001b60006b9000701004e2db9000801009900342db900090100c0000a3a041904b9000b0100c0001bb6001c2cb8001d1904b9000c0100c0001eb6001f852cb8001da7ffc92ab4000e2ab400202cb800212ab400122ab400202cb800212cb600224e2d2bb80023a700232ab400012bb8002a2ab4000e2ab400202bb800212ab400122ab400202bb80021b1
+fromDataPre_GFE_8_0_0_0,50,b800259900162a2bb80026b500272a2bb80014b50015a7001b2a2bb80028b500012a2bb80029b5000e2a2bb80029b50012b1
+toData,235,10093e1d2ab4000ec7000707a700112ab4000eb9001601001008680860603e1d2ab40012c7000707a700112ab40012b9001601001008680860603e1d2ab40001c7000703a7000d2ab40001b60017100c68603ebb0018591d01b700194d2ab40001c7000b2c03b6001aa7005d2c04b6001a2ab40001b600172cb8001b2ab40001b60006b9000701003a041904b9000801009900351904b900090100c0000a3a051905b9000b0100c0001cb6001d2cb8001e1905b9000c0100c0001fb60020852cb8001ea7ffc72ab4000e2ab400212cb800222ab400122ab400212cb800222cb600233a0419042bb80024b1
+toDataPre_GFE_8_0_0_0,193,b8002599009dbb0018591103e82bb8002ab700194d2ab40001c7000b2c03b6001aa7005a2c04b6001a2ab40001b600172cb8001b2ab40001b60006b9000701004e2db9000801009900342db900090100c0000a3a041904b9000b0100c0001cb6001d2cb8001e1904b9000c0100c0001fb60020852cb8001ea7ffc92ab4000e2ab400212cb800222ab400122ab400212cb800222cb600234e2d2bb80024a700232ab400012bb8002b2ab4000e2ab400212bb800222ab400122ab400212bb80022b1
 
 com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor$FindDurableQueueMessage,2
 fromData,24,2a2bb7001e2a2bb9001f0100b500052a2bb80020b50006b1
@@ -1912,8 +1912,8 @@ fromData,63,2ab800222bb90023010032b500072a2bb900230100b80024b5001c2a2bb80025b600
 toData,59,2b2ab40007b60016b9001b02002b2ab4001cb6001db9001b02002ab600142bb8001e2ab400082bb8001f2ab400202bb8001f2ab400212bb8001fb1
 
 com/gemstone/gemfire/internal/cache/tier/sockets/ClientUpdateMessageImpl,2
-fromData,171,2a2bb9009a0100b8009bb500082a2bb8009cb5000a2a2bb80078b5000b2a2bb8009db5000c2a2bb9009a0100b5000d2a2bb8009eb5000f2a2bb9009f0100b500032a2bb9009f0100b500982a2bb8009db500072a2bb9009f0100b500042a2bb80078b5000eb800a04d2bb800a14e2cc600102dc6000c2c2db600a2c000704e2a2db5006e2bb800a14e2cc600102dc6000c2c2db600a2c000704e2a2db500372a2bb80078c000a3b50013b1
-toData,162,2b2ab40008b60092b9009302002ab4000a2bb800942ab4000b2bb800952ab4000cc100559900142ab4000cc00055c000552bb80096a7000e2ab4000cb800232bb800962b2ab4000db9009302002ab4000f2bb800952b2ab40003b9009702002b2ab40098b9009702002ab400072bb800962b2ab40004b9009702002ab4000e2bb800952ab4006ec000702bb800992ab40037c000702bb800992ab400132bb80095b1
+fromData,171,2a2bb900a40100b800a5b500082a2bb800a6b5000a2a2bb80082b5000b2a2bb800a7b5000c2a2bb900a40100b5000d2a2bb800a8b5000f2a2bb900a90100b500032a2bb900a90100b500a22a2bb800a7b500072a2bb900a90100b500042a2bb80082b5000eb800aa4d2bb800ab4e2cc600102dc6000c2c2db600acc0007a4e2a2db500782bb800ab4e2cc600102dc6000c2c2db600acc0007a4e2a2db500372a2bb80082c000adb50013b1
+toData,162,2b2ab40008b6009cb9009d02002ab4000a2bb8009e2ab4000b2bb8009f2ab4000cc100559900142ab4000cc00055c000552bb800a0a7000e2ab4000cb800232bb800a02b2ab4000db9009d02002ab4000f2bb8009f2b2ab40003b900a102002b2ab400a2b900a102002ab400072bb800a02b2ab40004b900a102002ab4000e2bb8009f2ab40078c0007a2bb800a32ab40037c0007a2bb800a32ab400132bb8009fb1
 
 com/gemstone/gemfire/internal/cache/tier/sockets/HAEventWrapper,2
 fromData,432,2bb8003c9901862a2bb8003dc00037b5000d2abb001059b7003eb500052ab400052bb8003f2ab40005c000102ab4000db600402ab40005b90041010099011c2bb800423e1d02a00008014da700fabb0043591d0c04b700444d03360415041da200e62bb8003dc000133a052bb900450100360715071043a000922bb800423608150802a0000dbb0047591248b70049bf150804a000252bb8003dc0004a3a092bb8003dc0004b3a0abb004c591909190ab7004d3a06a7005115089a0014bb004c590103b8004eb7004d3a06a7003bbb004f591508b700503a0603360915091508a200262bb8003dc0004a3a0a2bb8003dc0004b3a0b1906190a190bb900510300840901a7ffd9a7003015071029a0000dbb0047591248b70049bfbb004759bb001b59b7001c1252b6001e1507b60053b60024b70049bf2c19051906b6005457840401a7ff1a2a2cb500062ab40005c000102ab40006b600552a2ab40005b900070100b500082a2ab40005b900090100b5000a2a2ab40005b9000b0100b50002b2000e2a09b6000fa700282bb8003d57bb001059b7003e2bb8003fb20056b90057010099000db200561258b900590200b1

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java b/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
index a691a14..eae1906 100644
--- a/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
+++ b/geode-cq/src/main/java/com/gemstone/gemfire/cache/query/internal/cq/CqServiceImpl.java
@@ -40,6 +40,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import org.apache.logging.log4j.Logger;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -54,6 +55,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 public final class CqServiceImpl implements CqService  {
   private static final Logger logger = LogService.getLogger();
   
+  private static final Integer MESSAGE_TYPE_LOCAL_CREATE = Integer.valueOf(MessageType.LOCAL_CREATE); 
+  private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = Integer.valueOf(MessageType.LOCAL_UPDATE); 
+  private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = Integer.valueOf(MessageType.LOCAL_DESTROY); 
+  private static final Integer MESSAGE_TYPE_EXCEPTION = Integer.valueOf(MessageType.EXCEPTION);
+  
   /** 
    * System property to evaluate the query even though the initial results are not required
    * when cq is executed using the execute() method. 
@@ -94,7 +100,7 @@ public final class CqServiceImpl implements CqService  {
   
   // Map to manage the similar CQs (having same query - performance optimization).
   // With query as key and Set of CQs as values.
-  private final HashMap<String, HashSet<String>> matchingCqMap;
+  private final ConcurrentHashMap matchingCqMap;
 
   // CQ Service statistics
   public final CqServiceStatisticsImpl cqServiceStats;
@@ -127,7 +133,7 @@ public final class CqServiceImpl implements CqService  {
 
     
     // Initialize the Map which maintains the matching cqs.
-    this.matchingCqMap = new HashMap<String, HashSet<String>>();
+    this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>();
     
     // Initialize the VSD statistics
     StatisticsFactory factory = cache.getDistributedSystem();
@@ -928,18 +934,33 @@ public final class CqServiceImpl implements CqService  {
     this.isRunning = true;
   }
   
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<ClientProxyMembershipID, String>> serverCqNameCache = new ConcurrentHashMap<>();
+
   /**
    * @return Returns the serverCqName.
    */
-  @Override
   public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
-    String cName = null;
-    if (clientProxyId.isDurable()) {
-      cName = cqName + "__" + clientProxyId.getDurableId();
+    ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache.get(cqName);
+    if (null == cache) {
+      final ConcurrentHashMap<ClientProxyMembershipID, String> old = serverCqNameCache.putIfAbsent(cqName,
+          cache = new ConcurrentHashMap<ClientProxyMembershipID, String>());
+      if (null != old) {
+        cache = old;
+      }
     }
-    else {
-      cName = cqName + "__" + clientProxyId.getDSMembership();
+
+    String cName = cache.get(clientProxyId);
+    if (null == cName) {
+      final StringBuilder sb = new StringBuilder(cqName).append("__");
+      if (clientProxyId.isDurable()) {
+        sb.append(clientProxyId.getDurableId());
+      } else {
+        sb.append(clientProxyId.getDSMembership());
+      }
+      cName = sb.toString();
+      cache.put(clientProxyId, cName);
     }
+
     return cName;
   }
   
@@ -1058,7 +1079,6 @@ public final class CqServiceImpl implements CqService  {
     if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
       return;
     }
-    
     // invoke CQ Listeners.
     CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners();
     
@@ -1406,15 +1426,17 @@ public final class CqServiceImpl implements CqService  {
           }
         } else {
           boolean error = false;
-          synchronized (cQuery) {
+          //synchronized (cQuery) 
+          {
             try {
-              // Apply query on new value.
-              if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
-                executionStartTime = this.stats.startCqQueryExecution();
+              synchronized (cQuery) {
+                // Apply query on new value.
+                if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
+                  executionStartTime = this.stats.startCqQueryExecution();
 
-                b_cqResults_newValue = evaluateQuery(cQuery, 
-                    new Object[] {cqUnfilteredEventsSet_newValue});
-                this.stats.endCqQueryExecution(executionStartTime);
+                  b_cqResults_newValue = evaluateQuery(cQuery, new Object[] { cqUnfilteredEventsSet_newValue });
+                  this.stats.endCqQueryExecution(executionStartTime);
+                }
               }
 
               // In case of Update, destroy and invalidate.
@@ -1446,15 +1468,16 @@ public final class CqServiceImpl implements CqService  {
                     }
                   }
                   
-                  // Apply query on old value.
-                  if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
-                    executionStartTime = this.stats.startCqQueryExecution();
-                    b_cqResults_oldValue = evaluateQuery(cQuery, 
-                        new Object[] {cqUnfilteredEventsSet_oldValue});
-                    this.stats.endCqQueryExecution(executionStartTime);
-                  } else {
-                    if (isDebugEnabled) {
-                      logger.debug("old value for event with key {} is null - query execution not performed", eventKey);
+                  synchronized (cQuery) {
+                    // Apply query on old value.
+                    if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
+                      executionStartTime = this.stats.startCqQueryExecution();
+                      b_cqResults_oldValue = evaluateQuery(cQuery, new Object[] { cqUnfilteredEventsSet_oldValue });
+                      this.stats.endCqQueryExecution(executionStartTime);
+                    } else {
+                      if (isDebugEnabled) {
+                        logger.debug("old value for event with key {} is null - query execution not performed", eventKey);
+                      }
                     }
                   }
                 } // Query oldValue
@@ -1472,14 +1495,14 @@ public final class CqServiceImpl implements CqService  {
             }
 
             if (error) {
-              cqEvent = Integer.valueOf(MessageType.EXCEPTION); 
+              cqEvent = MESSAGE_TYPE_EXCEPTION; 
             } 
             else { 
               if (b_cqResults_newValue) {
                 if (b_cqResults_oldValue) {
-                  cqEvent = Integer.valueOf(MessageType.LOCAL_UPDATE);
+                  cqEvent = MESSAGE_TYPE_LOCAL_UPDATE;
                 } else {
-                  cqEvent = Integer.valueOf(MessageType.LOCAL_CREATE);
+                  cqEvent = MESSAGE_TYPE_LOCAL_CREATE;
                 }
                 // If its create and caching is enabled, cache the key 
                 // for this CQ.
@@ -1488,7 +1511,7 @@ public final class CqServiceImpl implements CqService  {
                 // Base invalidate operation is treated as destroy.
                 // When the invalidate comes through, the entry will no longer 
                 // satisfy the query and will need to be deleted.
-                cqEvent = Integer.valueOf(MessageType.LOCAL_DESTROY);
+                cqEvent = MESSAGE_TYPE_LOCAL_DESTROY;
                 // If caching is enabled, mark this event's key as removed 
                 // from the CQ cache.
                 cQuery.markAsDestroyedInCqResultKeys(eventKey);
@@ -1497,9 +1520,9 @@ public final class CqServiceImpl implements CqService  {
           } //end synchronized(cQuery)
 
           // Get the matching CQs if any.
-          synchronized (this.matchingCqMap){
+          //synchronized (this.matchingCqMap){
             String query = cQuery.getQueryString();
-            HashSet matchingCqs = matchingCqMap.get(query);
+            Set matchingCqs = (Set)matchingCqMap.get(query);
             if (matchingCqs != null) {
               Iterator iter = matchingCqs.iterator();
               while (iter.hasNext()) {
@@ -1512,7 +1535,7 @@ public final class CqServiceImpl implements CqService  {
                 }
               }
             }
-          }
+          //}
         }
 
         if (cqEvent != null && cQuery.isRunning()){
@@ -1870,13 +1893,13 @@ public final class CqServiceImpl implements CqService  {
   public void addToMatchingCqMap(CqQueryImpl cq) {
     synchronized(this.matchingCqMap){
       String cqQuery = cq.getQueryString();
-      HashSet<String> matchingCQs = null;
+      Set<String> matchingCQs = null;
       if (!matchingCqMap.containsKey(cqQuery)){
-        matchingCQs = new HashSet<String>();
+        matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap());
         matchingCqMap.put(cqQuery, matchingCQs);
         this.stats.incUniqueCqQuery();
       } else {
-        matchingCQs = matchingCqMap.get(cqQuery);
+        matchingCQs = (Set)matchingCqMap.get(cqQuery);
       }
       matchingCQs.add(cq.getServerCqName());
       if (logger.isDebugEnabled()) {
@@ -1893,7 +1916,7 @@ public final class CqServiceImpl implements CqService  {
     synchronized(this.matchingCqMap){
       String cqQuery = cq.getQueryString();
       if (matchingCqMap.containsKey(cqQuery)){
-    	  HashSet matchingCQs = matchingCqMap.get(cqQuery);
+    	  Set matchingCQs = (Set)matchingCqMap.get(cqQuery);
         matchingCQs.remove(cq.getServerCqName());
         if (logger.isDebugEnabled()) {
           logger.debug("Removing CQ from MatchingCQ map, CQName: {} Number of matched querys are: {}", cq.getServerCqName(), matchingCQs.size());
@@ -1910,7 +1933,7 @@ public final class CqServiceImpl implements CqService  {
    * Returns the matching CQ map.
    * @return HashMap matchingCqMap
    */
-  public HashMap<String, HashSet<String>> getMatchingCqMap(){
+  public Map<String, HashSet<String>> getMatchingCqMap(){
     return matchingCqMap; 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
index ef9e61b..1057571 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfDUnitTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.junit.experimental.categories.Category;
 
@@ -995,7 +996,7 @@ public class CqPerfDUnitTest extends JUnit4CacheTestCase {
           Assert.fail ("Failed to get the internal CqService.", ex);
         }
 
-        HashMap matchedCqMap = cqService.getMatchingCqMap();
+        Map matchedCqMap = cqService.getMatchingCqMap();
         assertEquals("The number of matched cq is not as expected.", mapSize, matchedCqMap.size());
 
         if (query != null) {        

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0e8e5945/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
index dcd3915..3c7a059 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqPerfUsingPoolDUnitTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.junit.experimental.categories.Category;
 
@@ -952,7 +953,7 @@ public class CqPerfUsingPoolDUnitTest extends JUnit4CacheTestCase {
           Assert.fail ("Failed to get the internal CqService.", ex);
         }
 
-        HashMap matchedCqMap = cqService.getMatchingCqMap();
+        Map matchedCqMap = cqService.getMatchingCqMap();
         assertEquals("The number of matched cq is not as expected.", mapSize, matchedCqMap.size());
 
         if (query != null) {        


Mime
View raw message