geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [19/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:26 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
index f42cc08..1d61efa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -14,6 +14,7 @@ import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -36,6 +37,8 @@ import com.gemstone.gemfire.cache.query.QueryException;
 import com.gemstone.gemfire.cache.query.QueryExecutionLowMemoryException;
 import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
 import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.internal.CompiledGroupBySelect;
 import com.gemstone.gemfire.cache.query.internal.CompiledID;
 import com.gemstone.gemfire.cache.query.internal.CompiledIndexOperation;
 import com.gemstone.gemfire.cache.query.internal.CompiledIteratorDef;
@@ -45,17 +48,28 @@ import com.gemstone.gemfire.cache.query.internal.CompiledPath;
 import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
 import com.gemstone.gemfire.cache.query.internal.CompiledSortCriterion;
 import com.gemstone.gemfire.cache.query.internal.CompiledValue;
+import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
 import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults.Metadata;
 import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver.IndexInfo;
+import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
+import com.gemstone.gemfire.cache.query.internal.OrderByComparator;
 import com.gemstone.gemfire.cache.query.internal.PRQueryTraceInfo;
 import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext;
 import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
 import com.gemstone.gemfire.cache.query.internal.ResultsBag;
+import com.gemstone.gemfire.cache.query.internal.ResultsSet;
 import com.gemstone.gemfire.cache.query.internal.RuntimeIterator;
+import com.gemstone.gemfire.cache.query.internal.SortedResultsBag;
+import com.gemstone.gemfire.cache.query.internal.SortedStructBag;
 import com.gemstone.gemfire.cache.query.internal.StructBag;
 import com.gemstone.gemfire.cache.query.internal.StructImpl;
+import com.gemstone.gemfire.cache.query.internal.StructSet;
+import com.gemstone.gemfire.cache.query.internal.parse.OQLLexerTokenTypes;
+import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl;
+import com.gemstone.gemfire.cache.query.internal.utils.PDXUtils;
 import com.gemstone.gemfire.cache.query.types.ObjectType;
 import com.gemstone.gemfire.cache.query.types.StructType;
 import com.gemstone.gemfire.distributed.DistributedMember;
@@ -66,6 +80,7 @@ import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.partitioned.QueryMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.StreamingPartitionOperation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -147,7 +162,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   private static final int MAX_PR_QUERY_RETRIES = Integer.getInteger("gemfire.MAX_PR_QUERY_RETRIES", 10).intValue();
 
   private final PartitionedRegion pr;
-  private volatile Map node2bucketIds;
+  private volatile Map<InternalDistributedMember,List<Integer>> node2bucketIds;
   private final DefaultQuery query;
   private final Object[] parameters;
   private SelectResults cumulativeResults;
@@ -184,7 +199,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     this.bucketsToQuery = bucketsToQuery;
     this.successfulBuckets = new IntOpenHashSet(this.bucketsToQuery.size());
     this.resultsPerMember = new ConcurrentHashMap<InternalDistributedMember, Collection<Collection>>();
-    this.node2bucketIds = Collections.EMPTY_MAP;
+    this.node2bucketIds = Collections.emptyMap();
     if (query != null && query.isTraced()) {
       prQueryTraceInfoList = new ConcurrentLinkedQueue();
     }
@@ -222,6 +237,19 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   @Override  
   protected boolean processData(List objects, InternalDistributedMember sender,
                                 int sequenceNum, boolean lastInSequence) {
+    //check if sender is pre gfe_90. In that case the results coming from them are not sorted
+    // we will have to sort it
+    boolean sortNeeded = false;
+    List<CompiledSortCriterion> orderByAttribs = null;
+    if(sender.getVersionObject().compareTo(Version.GFE_90) < 0 ) {
+      CompiledSelect cs = this.query.getSimpleSelect();
+      if(cs != null && cs.isOrderBy()) {
+        sortNeeded = true;
+        orderByAttribs = cs.getOrderByAttrs();
+      }
+      
+      
+    }
     Collection results = this.resultsPerMember.get(sender);
     if (results == null) {
       synchronized (this.resultsPerMember) {
@@ -254,10 +282,13 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     if (logger.isDebugEnabled()) {
       logger.debug("Results per member, for {} size: {}", sender, objects.size());
     }
+    if(sortNeeded) {
+      objects = sortIncomingData(objects, orderByAttribs);
+    }
 
     synchronized (results) {
-      if (!QueryMonitor.isLowMemory()) {
-        results.add(objects);
+      if (!QueryMonitor.isLowMemory()) {        
+          results.add(objects);        
       } else {
         if (logger.isDebugEnabled()) {
           logger.debug("query canceled while gathering results, aborting");
@@ -291,6 +322,39 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     */
     return true;
   }
+
+  //TODO Asif: optimize it by creating a Sorted SelectResults Object at the time of fromData , so 
+  // that processData already recieves ordered data.
+  private List sortIncomingData(List objects,
+      List<CompiledSortCriterion> orderByAttribs) {
+    ObjectType resultType = cumulativeResults.getCollectionType().getElementType();
+    ExecutionContext local = new ExecutionContext(null, this.pr.cache);
+    Comparator comparator = new OrderByComparator(orderByAttribs, resultType, local);
+    boolean nullAtStart = !orderByAttribs.get(0).getCriterion();
+    final SelectResults newResults; 
+    //Asif: There is a bug in the versions < 9.0, such that the struct results coming from the 
+    // bucket nodes , do not contain approrpiate ObjectTypes. All the projection fields have 
+    // have the types as ObjectType. The resultset being created here has the right more selective type.
+    // so the addition of objects throw exception due to type mismatch. To handle this problem, instead
+    // of adding the struct objects as is, add fieldValues.
+    if(resultType != null && resultType.isStructType() )  {
+      SortedStructBag sortedStructBag = new SortedStructBag(comparator, (StructType) resultType, 
+          nullAtStart);
+      for(Object o : objects) {
+        Struct s = (Struct)o;
+        sortedStructBag.addFieldValues(s.getFieldValues());
+      }
+      newResults = sortedStructBag;
+    }else {
+      newResults = new SortedResultsBag(comparator,resultType, nullAtStart);
+      newResults.addAll(objects) ;
+    }
+        
+   
+    objects = newResults.asList();
+    return objects;
+  }
+  
   
   /**
     * Returns normally if succeeded to get data, otherwise throws an exception
@@ -305,7 +369,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       throw new InterruptedException();
     }
 
-    HashMap n2b = new HashMap(this.node2bucketIds);
+    HashMap<InternalDistributedMember,List<Integer>> n2b = new HashMap<InternalDistributedMember,List<Integer>>(this.node2bucketIds);
     n2b.remove(this.pr.getMyId());
     // Shobhit: IF query is originated from a Function and we found some buckets on
     // remote node we should throw exception mentioning data movement during function execution.
@@ -333,10 +397,10 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         // send separate message to each recipient since each one has a
         // different list of bucket ids
         processor = new StreamingQueryPartitionResponse(this.sys, n2b.keySet());
-        for (Iterator itr = n2b.entrySet().iterator(); itr.hasNext();) {
-          Map.Entry me = (Map.Entry) itr.next();
-          final InternalDistributedMember rcp = (InternalDistributedMember) me.getKey();
-          final List bucketIds = (List) me.getValue();
+        for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> itr = n2b.entrySet().iterator(); itr.hasNext();) {
+          Map.Entry<InternalDistributedMember , List<Integer>> me =  itr.next();
+          final InternalDistributedMember rcp =  me.getKey();
+          final List<Integer> bucketIds =  me.getValue();
           DistributionMessage m = createRequestMessage(rcp, processor, bucketIds);
           Set notReceivedMembers = this.sys.getDistributionManager().putOutgoing(m);
           if (th != null) {
@@ -450,7 +514,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    * that cause bucket data to be omitted from the results.
    * @throws InterruptedException
    */
-  public void queryBuckets(final TestHook th) throws QueryException, InterruptedException {
+  public SelectResults queryBuckets(final TestHook th) throws QueryException, InterruptedException {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     if (Thread.interrupted()) {
@@ -539,7 +603,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       */
     }
 
-    addResultsToResultSet();
+    return addResultsToResultSet();
   }
   
   /**
@@ -595,10 +659,10 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   
   
   private Set<Integer> caclulateRetryBuckets() {
-    Iterator memberToBucketList = node2bucketIds.entrySet().iterator();
+    Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> memberToBucketList = node2bucketIds.entrySet().iterator();
     final HashSet<Integer> retryBuckets = new HashSet<Integer>();
     while (memberToBucketList.hasNext()) {
-      Map.Entry<InternalDistributedMember, ArrayList<Integer>> e = (Map.Entry)memberToBucketList.next();
+      Map.Entry<InternalDistributedMember, List<Integer>> e = memberToBucketList.next();
       InternalDistributedMember m = e.getKey();
       if (!this.resultsPerMember.containsKey(m)
           || (!((MemberResultsList) this.resultsPerMember.get(m))
@@ -622,58 +686,93 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return retryBuckets;
   }
 
-  private void addResultsToResultSet() throws QueryException {
+  private SelectResults addResultsToResultSet() throws QueryException {
     int numElementsInResult = 0;
-    boolean isStructBag = false;
-    boolean isResultBag = false;
-
+    
     boolean isDistinct = false;
     boolean isCount = false;
+
+    int limit = -1; // -1 indicates no limit wsa specified in the query
+    // passed as null. Not sure if it can happen in real life situation.
+    // So instead of modifying test , using a null check in constructor
+    CompiledSelect cs = null;
+
    
-    int limit = -1; //-1 indicates no limit wsa specified in the query
-    //passed as null. Not sure if it can happen in real life situation.
-    //So instead of modifying test , using a null check in constructor
-    CompiledSelect cs =null;
-    
-    // Indicates whether to check for PdxInstance and convert them to 
-    // domain object. 
-    // In case of local queries the domain objects are stored in the result set
-    // for client/server queries PdxInstance are stored in result set. 
-    boolean getDomainObjectForPdx;
-    //indicated results from remote nodes need to be deserialized
-    //for local queries
-    boolean getDeserializedObject = false;
-    
-    if(this.query != null) {
-      cs =  this.query.getSimpleSelect();
+
+    if (this.query != null) {
+      cs = this.query.getSimpleSelect();
       limit = this.query.getLimit(parameters);
-      isDistinct = (cs != null)? cs.isDistinct():true;
-      isCount = (cs != null)? cs.isCount():false;
+      isDistinct = (cs != null) ? cs.isDistinct() : true;
+      isCount = (cs != null) ? cs.isCount() : false;
     }
-    
+
     if (isCount && !isDistinct) {
       addTotalCountForMemberToResults(limit);
-      return;
-    } 
-    
-    if (this.cumulativeResults instanceof StructBag) {
-    	isStructBag = true;
-    } else if (this.cumulativeResults instanceof ResultsBag) {
-    	isResultBag = true;
-    	//TODO:Asif: Idealy the isOrdered should  be the sufficient condtion. Remove the orderbyAttribs null check
-    } else if (this.cumulativeResults.getCollectionType().isOrdered() && cs.getOrderByAttrs() != null) {
+      return this.cumulativeResults;
+    }
+
+    boolean isGroupByResults = cs.getType() == CompiledValue.GROUP_BY_SELECT;
+    if(isGroupByResults) {
+      SelectResults baseResults = null;
+      CompiledGroupBySelect cgs = (CompiledGroupBySelect) cs;
+      if(cgs.getOrderByAttrs() != null && !cgs.getOrderByAttrs().isEmpty()) {
+        baseResults = this.buildSortedResult(cs, limit);        
+      }else {
+        baseResults = this.buildCumulativeResults(isDistinct, limit);
+      }
+      ExecutionContext context = new ExecutionContext(null, pr.cache);
+      context.setIsPRQueryNode(true);
+      return cgs.applyAggregateAndGroupBy(baseResults, context);
+    }else {
+
+      if (this.cumulativeResults.getCollectionType().isOrdered()
+        && cs.getOrderByAttrs() != null) {
       // If its a sorted result set, sort local and remote results using query.
-      buildSortedResult(cs, limit);
-      return;
+        return buildSortedResult(cs, limit);        
+      }else {  
+        return buildCumulativeResults(isDistinct, limit);
+      }
     }
+  }
 
+  private SelectResults buildCumulativeResults(boolean isDistinct, int limit) {
+    // Indicates whether to check for PdxInstance and convert them to
+    // domain object.
+    // In case of local queries the domain objects are stored in the result set
+    // for client/server queries PdxInstance are stored in result set.
+    boolean getDomainObjectForPdx;
+    // indicated results from remote nodes need to be deserialized
+    // for local queries
+    boolean getDeserializedObject = false;
+    int numElementsInResult = 0;
+    
+    ObjectType elementType = this.cumulativeResults.getCollectionType().getElementType();
+    boolean isStruct = elementType != null && elementType.isStructType();
     final DistributedMember me = this.pr.getMyId();
 
     if (DefaultQuery.testHook != null) {
       DefaultQuery.testHook.doTestHook(4);
     }
+  
     boolean localResults = false;
-    for (Map.Entry<InternalDistributedMember, Collection<Collection>> e : this.resultsPerMember.entrySet()) {
+    
+    List<CumulativeNonDistinctResults.Metadata> collectionsMetadata =null;
+    List<Collection> results = null;
+    
+    if(isDistinct) {
+      if(isStruct) {
+        StructType stype = (StructType)elementType;
+        this.cumulativeResults = new StructSet(stype);
+      }else {
+        this.cumulativeResults = new ResultsSet(elementType);
+      }
+    }else {
+      collectionsMetadata = new ArrayList<CumulativeNonDistinctResults.Metadata>();
+      results =  new ArrayList<Collection>();
+    }
+    
+    for (Map.Entry<InternalDistributedMember, Collection<Collection>> e : this.resultsPerMember
+        .entrySet()) {
       checkLowMemory();
       // If its a local query, the results should contain domain objects.
       // in case of client/server query the objects from PdxInstances were
@@ -687,107 +786,53 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       } else {
         // In case of remote nodes, the result objects are in PdxInstance form
         // get domain objects for local queries.
-        getDomainObjectForPdx = !(this.pr.getCache().getPdxReadSerializedByAnyGemFireServices());
+        getDomainObjectForPdx = !(this.pr.getCache()
+            .getPdxReadSerializedByAnyGemFireServices());
         // In case of select * without where clause the results from remote
         // nodes are sent in serialized form. For non client queries we need to
         // deserialize the value
-        if(!getDeserializedObject && !((DefaultQuery)this.query).isKeepSerialized()){
+        if (!getDeserializedObject
+            && !((DefaultQuery) this.query).isKeepSerialized()) {
           getDeserializedObject = true;
         }
       }
 
       final boolean isDebugEnabled = logger.isDebugEnabled();
-      
-      for (Collection res : e.getValue()) {
-        checkLowMemory();
-        //final TaintableArrayList res = (TaintableArrayList) e.getValue();      
-        if (res != null) {
-          if (isDebugEnabled) {
-            logger.debug("Query Result from member :{}: {}", e.getKey(), res.size());
-          }
-          if (limit == -1 && !getDomainObjectForPdx && !getDeserializedObject && 
-              (!isDistinct && localResults /* This check is to convert PdxString in projection lists to String */)) {
-            this.cumulativeResults.addAll(res);
-          } else {
+      if (!isDistinct) {
+        CumulativeNonDistinctResults.Metadata wrapper = CumulativeNonDistinctResults
+            .getCollectionMetadata(getDomainObjectForPdx,
+                getDeserializedObject, localResults);
+       
+          for (Collection res : e.getValue()) {
+            results.add(res);
+            collectionsMetadata.add(wrapper);    
+          }        
+      } else {
+        for (Collection res : e.getValue()) {
+          checkLowMemory();
+          // final TaintableArrayList res = (TaintableArrayList) e.getValue();
+          if (res != null) {
+            if (isDebugEnabled) {
+              logger.debug("Query Result from member :{}: {}", e.getKey(),
+                  res.size());
+            }
+
             if (numElementsInResult == limit) {
               break;
             }
+            boolean[] objectChangedMarker = new boolean[1];
+            
             for (Object obj : res) {
               checkLowMemory();
               int occurence = 0;
-              if (isStructBag) {
-                StructImpl simpl = (StructImpl) obj;
-                if (getDomainObjectForPdx) {
-                  try {
-                    if (simpl.isHasPdx()) {
-                      occurence = ((ResultsBag) this.cumulativeResults).addAndGetOccurence(simpl.getPdxFieldValues());
-                    } else {
-                      occurence = ((ResultsBag) this.cumulativeResults).addAndGetOccurence(simpl.getFieldValues());
-                    }
-                  } catch (Exception ex) {
-                    throw new QueryException(
-                        "Unable to retrieve domain object from PdxInstance while building the ResultSet. "
-                        + ex.getMessage());
-                  }
-                } else {
-                  Object[] values = simpl.getFieldValues();
-                  if(getDeserializedObject){
-                    for (int i = 0; i < values.length; i++) {
-                      if(values[i] instanceof VMCachedDeserializable){
-                        values[i] = ((VMCachedDeserializable)values[i]).getDeserializedForReading();
-                      }
-                    }
-                  }
-                  /* This is to convert PdxString to String */                
-                  if (simpl.isHasPdx() && isDistinct && localResults) {
-                    for (int i = 0; i < values.length; i++) {
-                      if(values[i] instanceof PdxString){
-                        values[i] = ((PdxString)values[i]).toString();
-                      }
-                    }
-                  }
-                  occurence = ((ResultsBag) this.cumulativeResults).addAndGetOccurence(values);
-                }
-              } else {
-                if (getDomainObjectForPdx) {
-                  if(obj instanceof PdxInstance){
-                    try {
-                      obj = ((PdxInstance) obj).getObject();
-                    } catch (Exception ex) {
-                      throw new QueryException(
-                          "Unable to retrieve domain object from PdxInstance while building the ResultSet. "
-                          + ex.getMessage());
-                    }
-                  }
-                  else if (obj instanceof PdxString){
-                    obj = ((PdxString)obj).toString();
-                  }
-                } else if (isDistinct && localResults && obj instanceof PdxString) {
-                  /* This is to convert PdxString to String */
-                  obj = ((PdxString)obj).toString();
-                }
-                
-                if (isResultBag) {
-                  if(getDeserializedObject && obj instanceof VMCachedDeserializable) {
-                    obj = ((VMCachedDeserializable)obj).getDeserializedForReading();
-                  }
-                    occurence = ((ResultsBag) this.cumulativeResults)
-                  .addAndGetOccurence(obj);
-                } else {
-                  if(getDeserializedObject && obj instanceof VMCachedDeserializable) {
-                      obj = ((VMCachedDeserializable)obj).getDeserializedForReading();
-                  } 
-                  
-                  // Resultset or StructSet, SortedResultSet, SortedStructSet.
-                  // Once we start passing Object[] in the List , the below should
-                  // change for StructSet and possibly SortedStructSet
-                  occurence = this.cumulativeResults.add(obj) ? 1 : 0;
-                }
-              }
-
+              obj = PDXUtils.convertPDX(obj, isStruct,
+                  getDomainObjectForPdx, getDeserializedObject, localResults, objectChangedMarker, true);
+              boolean elementGotAdded = isStruct? ((StructSet)this.cumulativeResults).addFieldValues((Object[])obj):
+                this.cumulativeResults.add(obj);
+              occurence = elementGotAdded ? 1 : 0;
               // Asif: (Unique i.e first time occurence) or subsequent occurence
               // for non distinct query
-              if (occurence == 1 || (occurence > 1 && !isDistinct)) {
+              if (occurence == 1) {
                 ++numElementsInResult;
                 // Asif:Check again to see if this addition caused limit to be
                 // reached so that current loop will not iterate one more
@@ -801,18 +846,29 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
       }
     }
-    
-    if (prQueryTraceInfoList != null && this.query.isTraced() && logger.isInfoEnabled()) {
+
+    if (prQueryTraceInfoList != null && this.query.isTraced()
+        && logger.isInfoEnabled()) {
       if (DefaultQuery.testHook != null) {
         DefaultQuery.testHook.doTestHook("Create PR Query Trace String");
       }
       StringBuilder sb = new StringBuilder();
-      sb.append(LocalizedStrings.PartitionedRegion_QUERY_TRACE_LOG.toLocalizedString(this.query.getQueryString())).append("\n");
-      for (PRQueryTraceInfo queryTraceInfo: prQueryTraceInfoList) {
+      sb.append(
+          LocalizedStrings.PartitionedRegion_QUERY_TRACE_LOG
+              .toLocalizedString(this.query.getQueryString())).append("\n");
+      for (PRQueryTraceInfo queryTraceInfo : prQueryTraceInfoList) {
         sb.append(queryTraceInfo.createLogLine(me)).append("\n");
       }
-      logger.info(sb.toString());;
+      logger.info(sb.toString());
+      ;
     }
+    if (!isDistinct) {
+      this.cumulativeResults =  new CumulativeNonDistinctResults(results, limit,
+          this.cumulativeResults.getCollectionType().getElementType(),
+          collectionsMetadata);
+
+    } 
+    return this.cumulativeResults;
   }
   
   private void checkLowMemory() {
@@ -827,6 +883,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     }
   }
 
+  
 
   /**
    * Adds all counts from all member buckets to cumulative results.
@@ -865,211 +922,29 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    * This is added as quick turn-around, this is added based on most commonly used
    * queries, needs to be investigated further.
    */   
-  private void buildSortedResult(CompiledSelect cs, int limit) throws QueryException {
-    List projAttrs = cs.getProjectionAttributes();
-    List orderByAttrs = cs.getOrderByAttrs();
-    //boolean isCount = cs.isCount();
-    //List pIterators = cs.getIterators();
-    
-    String eMsg = "Unable to apply order-by on the partition region cumulative results.";
-    Assert.assertTrue(!(orderByAttrs == null), eMsg + " Null order-by attributes.");
-    List iterators = cs.getIterators();
-   
-    String projFields = null;
-    //Map<String, Boolean> orderByFields = new LinkedHashMap<String,Boolean>();
-    List<String> projectionFields = new LinkedList<String>();
+  private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws QueryException {
     
     try {
-      // Evaluate projection attributes.
-      //Create local execution context
-    //If order by clause is present , then compute dependency locally
-      
-      ExecutionContext localContext = new QueryExecutionContext(this.parameters,
+     ExecutionContext localContext = new QueryExecutionContext(this.parameters,
           this.pr.cache);
 
-      localContext.newScope(0);
-
-      Iterator iter = iterators.iterator();
-      while (iter.hasNext()) {
-
-        CompiledIteratorDef iterDef = (CompiledIteratorDef)iter.next();
-        // compute dependencies on this iter first before adding its
-        // RuntimeIterator to the current scope.
-        // this makes sure it doesn't bind attributes to itself
-        localContext.addDependencies(cs, iterDef
-            .computeDependencies(localContext));
-        RuntimeIterator rIter = iterDef.getRuntimeIterator(localContext);
-        localContext.addToIndependentRuntimeItrMap(iterDef);
-        localContext.bindIterator(rIter);
-
-      }
-
-      ObjectType type = cs.prepareResultType(localContext);
-      StringBuffer tempQueryBuffer = new StringBuffer(" order by ");
-      if (type.isStructType()) {
-        StructType structType = (StructType)type;
-        String[] fieldNames = structType.getFieldNames();
-        if (projAttrs == null) {
-          // Evaluate path iterators, in case of multiple paths appropriate
-          // alias needs to be added.
-          // E.g.: select distinct * from /r p, p.positions.values pos order by
-          // p.ID =>
-          // select distinct * from $1 m order by r.p.ID
-          List<RuntimeIterator> runTimeItrs = localContext
-              .getCurrentIterators();
-          Iterator<RuntimeIterator> itr = runTimeItrs.iterator();
-          while (itr.hasNext()) {
-            StringBuffer temp = new StringBuffer();
-            RuntimeIterator rIter = itr.next();
-            rIter.setIndexInternalID(null);
-            rIter.generateCanonicalizedExpression(temp, localContext);
-            projectionFields.add(temp.toString());
-          }
-
-        }
-        else {
-          Iterator<Object[]> itr = projAttrs.iterator();
-          while (itr.hasNext()) {
-            StringBuffer temp = new StringBuffer();
-            Object[] values = itr.next();
-            ((CompiledValue)values[1]).generateCanonicalizedExpression(temp,
-                localContext);
-            projectionFields.add(temp.toString());
-          }
-        }
-        // Evaluate order by attributes.
-        for (int i = 0; i < orderByAttrs.size(); i++) {
-          Object o = orderByAttrs.get(i);
-          if (o instanceof CompiledSortCriterion) {
-            CompiledSortCriterion csc = (CompiledSortCriterion)o;
-            CompiledValue cv = csc.getExpr();
-            StringBuffer temp = new StringBuffer();
-            cv.generateCanonicalizedExpression(temp, localContext);
-            Iterator<String> projFieldItr = projectionFields.iterator();
-            int index = 0;
-            boolean foundMatch = false;
-            String orderBy = temp.toString();
-            while (projFieldItr.hasNext() && !foundMatch) {
-              String projStr = projFieldItr.next();
-              // int indexOfDot = orderBy.indexOf('.');
-              if (orderBy.equals(projStr)) {
-                // exact match , just append the field name
-                tempQueryBuffer.append(' ');
-                tempQueryBuffer.append(fieldNames[index]);
-                tempQueryBuffer.append(' ');
-                tempQueryBuffer.append(csc.getCriterion() ? " desc " : " asc ");
-                tempQueryBuffer.append(',');
-                foundMatch = true;
-              }
-              else if (orderBy.startsWith(projStr)) {
-                tempQueryBuffer.append(fieldNames[index]);
-                tempQueryBuffer.append(temp.substring(projStr.length()));
-
-                tempQueryBuffer.append(' ');
-                tempQueryBuffer.append(csc.getCriterion() ? " desc " : " asc ");
-                tempQueryBuffer.append(',');
-                foundMatch = true;
-              }
-              ++index;
-            }
-            if (!foundMatch) {
-              throw new QueryException("Order by clause " + orderBy
-                  + " not derivable from any projection attribute");
-            }
-
-            // orderByFields.put(temp.toString(), !csc.getCriterion());
-          }
-        }
-        tempQueryBuffer.deleteCharAt(tempQueryBuffer.length() - 1);
-
-      }
-      else {
-        String projStr = null;
-        if (projAttrs == null) {
-          List<RuntimeIterator> runTimeItrs = localContext
-              .getCurrentIterators();
-          Iterator<RuntimeIterator> itr = runTimeItrs.iterator();
-
-          StringBuffer temp = new StringBuffer();
-          RuntimeIterator rIter = itr.next();
-          rIter.setIndexInternalID(null);
-          rIter.generateCanonicalizedExpression(temp, localContext);
-          projStr = temp.toString();
-        }
-
-        else {
-          Iterator<Object[]> itr = projAttrs.iterator();
-          StringBuffer temp = new StringBuffer();
-          Object[] values = itr.next();
-          ((CompiledValue)values[1]).generateCanonicalizedExpression(temp,
-              localContext);
-          projStr = temp.toString();
-
-        }
-        // Evaluate order by attributes.
-        for (int i = 0; i < orderByAttrs.size(); i++) {
-          Object o = orderByAttrs.get(i);
-
-          if (o instanceof CompiledSortCriterion) {
-            CompiledSortCriterion csc = (CompiledSortCriterion)o;
-            CompiledValue cv = csc.getExpr();
-            StringBuffer temp = new StringBuffer();
-            cv.generateCanonicalizedExpression(temp, localContext);
-
-            String orderBy = temp.toString();
-            // int indexOfDot = temp.indexOf(".");
-
-            if (orderBy.equals(projStr)) {
-              // exact match , just append the field name
-              tempQueryBuffer.append(' ');
-              tempQueryBuffer.append("iter");
-              tempQueryBuffer.append(' ');
-
-            }
-            else if (orderBy.startsWith(projStr)) {
-              tempQueryBuffer.append(' ');
-              String attr = temp.substring(projStr.length() + 1);
-              // escape reserved keywords
-              attr = checkReservedKeyword(attr);
-              tempQueryBuffer.append(attr);
-            }
-            else {
-              throw new QueryException("Order by clause " + orderBy
-                  + " not derivable from projection attribute " + projStr);
-            }
-
-            tempQueryBuffer.append(' ');
-            tempQueryBuffer.append(csc.getCriterion() ? " desc " : " asc ");
-            tempQueryBuffer.append(',');
-          }
-        }
-        tempQueryBuffer.deleteCharAt(tempQueryBuffer.length() - 1);
-      }
-
-      tempQueryBuffer.insert(0, " SELECT DISTINCT * FROM $1 iter ");
       
-      if (logger.isDebugEnabled()) {
-        logger.debug("The temp query generated to evaluate order-by on PR commulative results: {}", tempQueryBuffer.toString());
-      }
-
-      DefaultQuery q = (DefaultQuery)this.pr.getCache().getQueryService()
-          .newQuery(tempQueryBuffer.toString());
-      ExecutionContext context;
-      
-      final DistributedMember me = this.pr.getMyId();
-
+      List<Collection> allResults = new ArrayList<Collection>();
       for (Collection<Collection> memberResults : this.resultsPerMember.values()) {
         for (Collection res : memberResults) {
           if (res != null) {
-            context = new QueryExecutionContext((new Object[] { res }), this.pr
-                .getCache(), this.cumulativeResults, q);
-            q.executeUsingContext(context);
+            allResults.add(res);
           }
         }
       }
+      
+      this.cumulativeResults = new NWayMergeResults(allResults, cs.isDistinct(), limit, cs.getOrderByAttrs(), 
+          localContext, cs.getElementTypeForOrderByQueries());
+      return this.cumulativeResults;
     } catch (Exception ex) {
       throw new QueryException("Unable to apply order-by on the partition region cumulative results.", ex);
     }
+    
   }
 
   //returns attribute with escape quotes #51085 and #51886
@@ -1101,13 +976,13 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   private String getQueryAttributes(CompiledValue cv, StringBuffer fromPath) throws QueryException {
     // field with multiple level like p.pos.secId
     String clause = "";
-    if (cv instanceof CompiledID)  {
+    if (cv.getType() == OQLLexerTokenTypes.Identifier)  {
       // It will be p.pos.secId
       clause = ((CompiledID)cv).getId() + clause;
     } else {
       do {
-        if (cv instanceof CompiledPath || cv instanceof CompiledIndexOperation) {
-          if (cv instanceof CompiledIndexOperation) {
+        if (cv.getType() == CompiledPath.PATH || cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
+          if (cv.getType() == OQLLexerTokenTypes.TOK_LBRACK) {
             CompiledIndexOperation cio = (CompiledIndexOperation)cv;
             CompiledLiteral cl = (CompiledLiteral)cio.getExpression();
             StringBuffer sb = new StringBuffer();
@@ -1118,7 +993,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
             }
           }
           clause = ("." + ((CompiledPath)cv).getTailID() + clause);
-        } else if (cv instanceof CompiledOperation) {
+        } else if (cv.getType() == OQLLexerTokenTypes.METHOD_INV) {
           // Function call.
           clause = "." + ((CompiledOperation)cv).getMethodName() + "()" + clause;
         } else {
@@ -1127,9 +1002,9 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
 
         cv = cv.getReceiver();
-      } while (!(cv instanceof CompiledID));
+      } while (!(cv.getType() == OQLLexerTokenTypes.Identifier));
 
-      if (cv instanceof CompiledID) {
+      if (cv.getType() == OQLLexerTokenTypes.Identifier) {
         clause = ((CompiledID)cv).getId() + clause;
         // Append region iterator alias. p
         if (fromPath != null) {
@@ -1140,161 +1015,6 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
     return clause;
   }
 
-  private void buildSortedResultBackup(CompiledSelect cs, int limit) throws QueryException {
-    List projAttrs = cs.getProjectionAttributes();
-    List orderByAttrs = cs.getOrderByAttrs();
-    List pIterators = cs.getIterators();
-    //boolean isDistinct = (cs != null)? cs.isDistinct():true;
-    
-    String eMsg = "Unable to apply order-by on the partition region cumulative results.";
-    Assert.assertTrue(!(orderByAttrs == null), eMsg + " Null order-by attributes.");
-    
-    StringBuffer fromPath =  new StringBuffer();
-    String projFields = null;
-    HashMap<String, String> orderByFields = new HashMap<String, String>();
-    
-    try {
-      // Evaluate projection attributes.
-      String fromIter = "";
-      if (projAttrs == null) {
-        // Evaluate path iterators, in case of multiple paths appropriate alias needs to be added.
-        // E.g.: select distinct * from /r p, p.positions.values pos order by p.ID =>
-        //         select distinct * from $1 m order by r.p.ID
-        for (int i = 1; i < pIterators.size(); i++) {
-          CompiledIteratorDef iterDef = (CompiledIteratorDef) pIterators.get(i-1);   
-          fromIter += (iterDef.getName() + ("."));
-        }
-      } else if (projAttrs.size() == 1) {
-        // In case single projections, it should be treated as the ordered field.
-        // E.g: select distinct status from /r order by status => select distinct * from $1 p order by p 
-        Object projDef[] = (Object[])projAttrs.get(0);
-        if (projDef[1] instanceof CompiledID) {
-          projFields = ((CompiledID)projDef[1]).getId();
-        } else if (projDef[1] instanceof CompiledPath) {
-          CompiledPath cp = (CompiledPath)projDef[1];
-          projFields = ((CompiledID)cp.getReceiver()).getId() + "." + cp.getTailID();
-        } else if (projDef[1] instanceof CompiledOperation) {
-          // Function call.
-          CompiledOperation cp = (CompiledOperation)projDef[1];
-          projFields = ((CompiledID)cp.getReceiver(null)).getId() + "." + cp.getMethodName() + "()";
-        } else {
-          throw new QueryException("Failed to evaluate projection attributes. " + eMsg);
-        }
-      }
-     
-      // Evaluate order by attributes.
-      for (int i = 0; i < orderByAttrs.size(); i++) {
-        Object o = orderByAttrs.get(i);
-        String orderByClause = "";
-        if (o instanceof CompiledSortCriterion) {
-          CompiledSortCriterion csc = (CompiledSortCriterion)o;
-          CompiledValue cv = csc.getExpr();  
-          
-          // field with multiple level like p.pos.secId
-          if (cv instanceof CompiledID)  {
-            // It will be p.pos.secId
-            orderByClause = ((CompiledID)cv).getId() + orderByClause;
-          } else {
-            do {
-            if (cv instanceof CompiledPath || cv instanceof CompiledIndexOperation) {
-              if (cv instanceof CompiledIndexOperation) {
-                CompiledIndexOperation cio = (CompiledIndexOperation)cv;
-                CompiledLiteral cl = (CompiledLiteral)cio.getExpression();
-                StringBuffer sb = new StringBuffer();
-                cl.generateCanonicalizedExpression(sb, null);
-                cv = ((CompiledIndexOperation)cv).getReceiver();
-                if (sb.length() > 0) {
-                  orderByClause = "[" + sb.toString() + "]" + orderByClause;
-                }
-              }
-              orderByClause = ("." + ((CompiledPath)cv).getTailID() + orderByClause);
-            } else if (cv instanceof CompiledOperation) {
-              // Function call.
-              orderByClause = "." + ((CompiledOperation)cv).getMethodName() + "()" + orderByClause;
-            } else if (cv instanceof CompiledIndexOperation) {
-              StringBuffer sb = new StringBuffer();
-              
-              //((CompiledIndexOperation)cv).generateCanonicalizedExpression(sb, null);
-              //if (cv2 instanceof CompiledPath) {
-              
-              //orderByClause = "." + ((CompiledIndexOperation)cv).+ "()";
-              //}
-            } else {
-              throw new QueryException("Failed to evaluate order by attributes, found unsupported type  " + cv.getType() + " " + eMsg);
-            }
-            // Ignore subsequent paths.
-            //do {
-                cv = cv.getReceiver();
-            } while (!(cv instanceof CompiledID));
-            
-            if (cv instanceof CompiledID) {
-              orderByClause = ((CompiledID)cv).getId() + orderByClause;
-              // Append region iterator alias. p
-              if (i == 0) {
-                fromPath.append(((CompiledID)cv).getId());
-                //if ((i+1) < orderByAttrs.size()) {
-                //  fromPath.append(", ");
-                //}
-              }
-            }
-            
-          } 
-          /*
-          else if (cv instanceof CompiledOperation) {
-            orderByClause = ((CompiledID)cv).getId() + orderByClause;
-          } else {
-            throw new QueryException("Failed to evaluate order-by attributes. " + eMsg);              
-          }
-          */
-          orderByFields.put(fromIter + orderByClause, (csc.getCriterion()? " desc " : " asc "));
-        }
-      }
-
-      StringBuffer tmpSortQuery =  new StringBuffer("SELECT DISTINCT * FROM $1 ");    
-      if (projFields != null && orderByFields.containsKey(projFields)) {
-        // Select distinct p.status from /region p order by p.status asc
-        // => Select distinct * from $1 p order by p asc
-        if (fromPath.length() > 0) {
-          tmpSortQuery.append(fromPath).append(" ORDER BY ").append(fromPath).append(" ").append(orderByFields.get(projFields));
-        } else {
-          tmpSortQuery.append(fromPath).append("p ORDER BY p").append(orderByFields.get(projFields));
-        }
-      } else {
-        /*
-        if (fromPath.length() > 0) {
-          tmpSortQuery.append(fromPath).append(" ORDER BY ").append(fromPath).append(".");
-        } else {
-        */
-        tmpSortQuery.append(fromPath).append(" ORDER BY ");
-        //}
-        Iterator iter = orderByFields.entrySet().iterator();
-        while (iter.hasNext()) {
-          Map.Entry<String, String> e = (Map.Entry<String, String>)iter.next();
-          tmpSortQuery.append(e.getKey()).append(" ").append(e.getValue());
-          if (iter.hasNext()) {
-            tmpSortQuery.append(", ");
-          }
-        }
-      }
-
-      if (logger.isDebugEnabled()) {
-        logger.debug("The temp query generated to evaluate order-by on PR commulative results: {}", tmpSortQuery);
-      }
-      
-      DefaultQuery q = (DefaultQuery)this.pr.getCache().getQueryService().newQuery(tmpSortQuery.toString());
-      ExecutionContext context;
-      for (Iterator i=this.resultsPerMember.values().iterator(); i.hasNext(); ) {
-        final TaintableArrayList res = (TaintableArrayList)i.next();
-        if (res!=null && res.isConsumable()) {
-          context = new QueryExecutionContext((new Object[] {res}), this.pr.getCache(), this.cumulativeResults, q);
-          q.executeUsingContext(context);
-          res.clear();
-        }
-      }   
-    } catch (Exception ex) {
-      throw new QueryException("Unable to apply order-by on the partition region cumulative results.", ex);
-    }
-  }
   /**
    * Generates a map with key as PR node and value as the list as a subset of
    * the bucketIds hosted by the node.
@@ -1303,7 +1023,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    */
   
   // (package access, and returns map for unit test purposes)
-  Map buildNodeToBucketMap() throws QueryException
+  Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMap() throws QueryException
   {
     return buildNodeToBucketMapForBuckets(this.bucketsToQuery);
   }
@@ -1312,17 +1032,17 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    * @param bucketIdsToConsider
    * @return Map of {@link InternalDistributedMember} to {@link ArrayList} of Integers
    */
-  private Map buildNodeToBucketMapForBuckets(final Set<Integer> bucketIdsToConsider) 
+  private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(final Set<Integer> bucketIdsToConsider) 
   throws QueryException {
     
-    final HashMap<InternalDistributedMember, ArrayList<Integer>> ret = new 
-    HashMap<InternalDistributedMember, ArrayList<Integer>>();
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new 
+    HashMap<InternalDistributedMember,List<Integer>>();
     
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
 
-    final ArrayList<Integer> bucketIds = new ArrayList<Integer>();
+    final List<Integer> bucketIds = new ArrayList<Integer>();
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
     final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
@@ -1367,7 +1087,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       }
       */
       
-      final ArrayList<Integer> buckets = new ArrayList<Integer>();
+      final List<Integer> buckets = new ArrayList<Integer>();
       for (Integer bid : bucketIdsToConsider) {
         if (!bucketIds.contains(bid)) {
           final Set owners = pr.getRegionAdvisor().getBucketOwners(bid.intValue());
@@ -1416,7 +1136,7 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       //Create PRQueryResultCollector here.
       //RQueryResultCollector resultCollector = new PRQueryResultCollector();
 
-      List bucketList = (List)this.node2bucketIds.get(me);
+      List<Integer> bucketList = this.node2bucketIds.get(me);
       //try {
         
         //this.pr.getDataStore().queryLocalNode(this.query, this.parameters,
@@ -1628,11 +1348,11 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
   }
   
   private InternalDistributedMember findNodeForBucket(Integer bucketId) {
-    for (Iterator itr = this.node2bucketIds.entrySet().iterator(); itr.hasNext(); ) {
-      Map.Entry entry = (Map.Entry)itr.next();
-      List blist = (List)entry.getValue();
-      for (Iterator itr2 = blist.iterator(); itr2.hasNext(); ) {
-        Integer bid = (Integer)itr2.next();
+    for (Iterator<Map.Entry<InternalDistributedMember,List<Integer>>> itr = this.node2bucketIds.entrySet().iterator(); itr.hasNext(); ) {
+      Map.Entry<InternalDistributedMember,List<Integer>> entry = itr.next();
+      List<Integer> blist = entry.getValue();
+      for (Iterator<Integer> itr2 = blist.iterator(); itr2.hasNext(); ) {
+        Integer bid = itr2.next();
         if (bid.equals(bucketId)) {
           return (InternalDistributedMember)entry.getKey();
         }
@@ -1751,5 +1471,8 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
       }          
     }  
     
+    public ObjectType getResultType() {
+      return PartitionedRegionQueryEvaluator.this.cumulativeResults.getCollectionType().getElementType();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PeerTXStateStub.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PeerTXStateStub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PeerTXStateStub.java
index ab68a80..7831406 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PeerTXStateStub.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PeerTXStateStub.java
@@ -30,17 +30,16 @@ import com.gemstone.gemfire.internal.logging.LogService;
 
 public class PeerTXStateStub extends TXStateStub {
 
-  private static final Logger logger = LogService.getLogger();
+  protected static final Logger logger = LogService.getLogger();
   
   private InternalDistributedMember originatingMember = null;
-  private TXCommitMessage commitMessage = null;
+  protected TXCommitMessage commitMessage = null;
 
   public PeerTXStateStub(TXStateProxy stateProxy, DistributedMember target,InternalDistributedMember onBehalfOfClient) {
     super(stateProxy, target);
     this.originatingMember = onBehalfOfClient;
   }
   
-  
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#rollback()
    */
@@ -154,14 +153,12 @@ public class PeerTXStateStub extends TXStateStub {
     }
   }
 
-
-  private void cleanup() {
+  protected void cleanup() {
     for (TXRegionStub regionStub : regionStubs.values()) {
       regionStub.cleanup();
     }
   }
 
-
   @Override
   protected TXRegionStub generateRegionStub(LocalRegion region) {
       TXRegionStub stub = null;
@@ -174,9 +171,6 @@ public class PeerTXStateStub extends TXStateStub {
       return stub;
   }
 
-
-
-
   @Override
   protected void validateRegionCanJoinTransaction(LocalRegion region)
       throws TransactionException {
@@ -212,7 +206,6 @@ public class PeerTXStateStub extends TXStateStub {
     }
   }
 
-
   public InternalDistributedMember getOriginatingMember() {
     /*
      * This needs to be set to the clients member id if the client originated the tx
@@ -227,25 +220,28 @@ public class PeerTXStateStub extends TXStateStub {
     this.originatingMember = clientMemberId;
   }
 
+  @Override
   public boolean isMemberIdForwardingRequired() {
     return getOriginatingMember()!=null;
   }
 
-
+  @Override
   public TXCommitMessage getCommitMessage() {
     return commitMessage;
   }
 
-
+  @Override
   public void suspend() {
     // no special tasks to perform
   }
 
 
+  @Override
   public void resume() {
     // no special tasks to perform
   }
 
+  @Override
   public void recordTXOperation(ServerRegionDataAccess region, ServerRegionOperation op, Object key, Object arguments[]) {
     // no-op here
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PersistentOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PersistentOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PersistentOplogSet.java
index 3745184..002c70c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PersistentOplogSet.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PersistentOplogSet.java
@@ -29,6 +29,7 @@ import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.DiskAccessException;
 import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.OplogEntryIdSet;
 import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore;
 import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView;
@@ -174,15 +175,15 @@ public class PersistentOplogSet implements OplogSet {
   }
   
   @Override
-  public void create(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async) {
-    getChild().create(region, entry, value, isSerializedObject, async);
+  public void create(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async) {
+    getChild().create(region, entry, value, async);
   }
   
   @Override
-  public void modify(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async) {
-    getChild().modify(region, entry, value, isSerializedObject, async);
+  public void modify(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async) {
+    getChild().modify(region, entry, value, async);
   }
   
   public void offlineModify(DiskRegionView drv, DiskEntry entry, byte[] value, boolean isSerializedObject) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PlaceHolderDiskRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PlaceHolderDiskRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PlaceHolderDiskRegion.java
index 9994d5b..1dde07a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PlaceHolderDiskRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PlaceHolderDiskRegion.java
@@ -150,6 +150,14 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
     public RegionMap getRegionMap() {
       return getRecoveredEntryMap();
     }
+    
+    @Override
+    public void close() {
+      RegionMap rm = getRecoveredEntryMap();
+      if (rm != null) {
+        rm.close();
+      }
+    }
 
     public void handleDiskAccessException(DiskAccessException dae) {
       getDiskStore()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
index 7e680fb..289a3f5 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
@@ -9,7 +9,6 @@
 package com.gemstone.gemfire.internal.cache;
 
 import java.util.Collections;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -32,6 +31,7 @@ import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
 import com.gemstone.gemfire.internal.InternalInstantiator;
 import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -124,14 +124,20 @@ public class PoolManagerImpl {
    */
   public void close(boolean keepAlive) {
     // destroying connection pools
+    boolean foundClientPool = false;
     synchronized(poolLock) {
       for (Iterator<Map.Entry<String,Pool>> itr = pools.entrySet().iterator(); itr.hasNext(); ) {
         Map.Entry<String,Pool> entry = itr.next();
         PoolImpl pool = (PoolImpl)entry.getValue();
         pool.basicDestroy(keepAlive);
+        foundClientPool = true;
       }
       pools = Collections.emptyMap();
       itrForEmergencyClose = null;
+      if (foundClientPool) {
+        // Now that the client has all the pools destroyed free up the pooled comm buffers
+        ServerConnection.emptyCommBufferPool();
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyBucketRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyBucketRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyBucketRegion.java
index 3e8880c..48c3e32 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyBucketRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyBucketRegion.java
@@ -122,7 +122,8 @@ public final class ProxyBucketRegion implements Bucket {
                                           partitionedRegion,
                                           partitionedRegion.getAttributes(),
                                           diskFlags, partitionName, startingBucketID,
-                                          partitionedRegion.getCompressor());
+                                          partitionedRegion.getCompressor(),
+                                          partitionedRegion.getOffHeap());
       
       if (fpaList != null) {
         for (FixedPartitionAttributesImpl fpa : fpaList) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index f37f623..b794b57 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -105,6 +105,11 @@ final class ProxyRegionMap implements RegionMap {
     return Collections.emptySet();
   }
 
+  @Override
+  public Collection<RegionEntry> regionEntriesInVM() {
+    return Collections.emptySet();
+  }
+
   public boolean containsKey(Object key) {
     return false;
   }
@@ -113,6 +118,10 @@ final class ProxyRegionMap implements RegionMap {
     return null;
   }
 
+  public RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
+    return null;
+  }
+
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public Set<VersionSource> clear(RegionVersionVector rvv) {
     // nothing needs to be done
@@ -261,12 +270,18 @@ final class ProxyRegionMap implements RegionMap {
         // fix for bug 39526
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
             key, null, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
+        boolean cbEventInPending = false;
+        try {
         AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState == null);
         if (pendingCallbacks == null) {
           this.owner
               .invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, e, true/* callDispatchListenerEvent */);
         } else {
           pendingCallbacks.add(e);
+          cbEventInPending = true;
+        }
+        } finally {
+          if (!cbEventInPending) e.release();
         }
       }
     }
@@ -283,15 +298,21 @@ final class ProxyRegionMap implements RegionMap {
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
                                                 true, this.owner.isInitialized())) {
         // fix for bug 39526
+        boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, 
             localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
             key, newValue, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
+        try {
         AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState == null);
         if (pendingCallbacks == null) {
           this.owner.invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, e,
               true/* callDispatchListenerEvent */);
         } else {
           pendingCallbacks.add(e);
+          cbEventInPending = true;
+        }
+        } finally {
+          if (!cbEventInPending) e.release();
         }
       }
     }
@@ -311,14 +332,20 @@ final class ProxyRegionMap implements RegionMap {
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
                                                 false, this.owner.isInitialized())) {
         // fix for bug 39526
+        boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key, 
             newValue, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
+        try {
         AbstractRegionMap.switchEventOwnerAndOriginRemote(e, txEntryState == null);
         if (pendingCallbacks == null) {
           this.owner
               .invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, e, true/* callDispatchListenerEvent */);
         } else {
           pendingCallbacks.add(e);
+          cbEventInPending = true;
+        }
+        } finally {
+          if (!cbEventInPending) e.release();
         }
       }
     }
@@ -461,9 +488,19 @@ final class ProxyRegionMap implements RegionMap {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
     }
     
+    @Override
+    public Object getValueRetain(RegionEntryContext context) {
+      throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
+    }
+    
     public void setValue(RegionEntryContext context, Object value) {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
     }
+    
+    @Override
+    public Object prepareValueForCache(RegionEntryContext r, Object val, boolean isEntryUpdate) {
+      throw new IllegalStateException("Should never be called");
+    }
 
 //    @Override
 //    public void _setValue(Object value) {
@@ -480,7 +517,7 @@ final class ProxyRegionMap implements RegionMap {
     }
 
     @Override
-    public Object _getValueUse(RegionEntryContext context, boolean decompress) {
+    public Object _getValueRetain(RegionEntryContext context, boolean decompress) {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
     }
 
@@ -575,7 +612,28 @@ final class ProxyRegionMap implements RegionMap {
     public void setUpdateInProgress(boolean underUpdate) {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
     }
-    
+
+    @Override
+    public boolean isMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
+    @Override
+    public void setMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
+    @Override
+    public void clearMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
     @Override
     public boolean isValueNull() {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
@@ -661,6 +719,12 @@ final class ProxyRegionMap implements RegionMap {
     @Override
     public void resetRefCount(NewLRUClockHand lruList) {
     }
+
+    @Override
+    public Object prepareValueForCache(RegionEntryContext r, Object val,
+        EntryEventImpl event, boolean isEntryUpdate) {
+      throw new IllegalStateException("Should never be called");
+    }
   }
 
   public void lruUpdateCallback(int n) {
@@ -706,7 +770,16 @@ final class ProxyRegionMap implements RegionMap {
   }
 
   @Override
+  public RegionEntry getOperationalEntryInVM(Object key) {
+    return null;
+  }
+
+  @Override
   public int sizeInVM() {
     return 0;
   }
+
+  @Override
+  public void close() {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
index e0ed20d..88cf6f7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/QueuedOperation.java
@@ -72,9 +72,10 @@ public class QueuedOperation
     else {
       // it is an entry operation
       //TODO :EventID should be passed from the sender & should be reused here
-      EntryEventImpl ee = new EntryEventImpl(
+      EntryEventImpl ee = EntryEventImpl.create(
           lr, this.op, this.key, null,
           this.cbArg, true, src);
+      try {
       //ee.setQueued(true);
       if (this.op.isCreate() || this.op.isUpdate()) {
         UpdateOperation.UpdateMessage.setNewValueInEvent(this.value,
@@ -132,6 +133,9 @@ public class QueuedOperation
       else {
         throw new IllegalStateException(LocalizedStrings.QueuedOperation_THE_0_SHOULD_NOT_HAVE_BEEN_QUEUED.toLocalizedString(this.op));
       }
+      } finally {
+        ee.release();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
index 5ac868d..b6e8fab 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
@@ -8,6 +8,9 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE;
+
 import com.gemstone.gemfire.cache.CacheWriterException;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -19,6 +22,11 @@ import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+import com.gemstone.gemfire.cache.EvictionCriteria;
 
 /**
  * Internal interface for a region entry.
@@ -187,7 +195,8 @@ public interface RegionEntry {
    * @return false if map entry not found
    * @since 3.2.1
    */
-  public boolean fillInValue(LocalRegion r, InitialImageOperation.Entry entry,
+  public boolean fillInValue(LocalRegion r,
+      @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry entry,
       ByteArrayDataInput in, DM mgr);
 
   /**
@@ -203,8 +212,14 @@ public interface RegionEntry {
      *  and returns it
      */
   public Object getValue(RegionEntryContext context);
+  /**
+   * Just like getValue but the result may be a retained off-heap reference.
+   */
+  @Retained
+  public Object getValueRetain(RegionEntryContext context);
 
-  public void setValue(RegionEntryContext context, Object value) throws RegionClearedException;
+  @Released
+  public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException;
   
   /**
    * This flavor of setValue was added so that the event could be passed down to Helper.writeToDisk.
@@ -214,9 +229,21 @@ public interface RegionEntry {
   public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) throws RegionClearedException;
   /**
    * Obtain and return the value of this entry using {@link #_getValue()}.
+   * If the value is a MemoryChunkWithRefCount then increment its refcount.
+   * WARNING: if a MemoryChunkWithRefCount is returned then the caller MUST
+   * call {@link MemoryChunkWithRefCount#release()}.
+   * 
+   * This is only retained in off-heap subclasses.  However, it's marked as
+   * Retained here so that callers are aware that the value may be retained.
+   * 
    * @param decompress if true returned value will be decompressed if it is compressed
+   * @return possible OFF_HEAP_OBJECT (caller must release)
    */
-  public Object _getValueUse(RegionEntryContext context, boolean decompress);
+  @Retained 
+  public Object _getValueRetain(RegionEntryContext context, boolean decompress);
+  /** Gets the value field of this entry. */
+  
+  @Unretained
   public Object _getValue();
   /**
    * Returns a tokenized form of the value.
@@ -232,7 +259,7 @@ public interface RegionEntry {
    * @param event the cache event that caused this change
    * @throws RegionClearedException thrown if the region is concurrently cleared
    */
-  public void setValueWithTombstoneCheck(Object value, EntryEvent event) throws RegionClearedException;
+  public void setValueWithTombstoneCheck(@Unretained Object value, EntryEvent event) throws RegionClearedException;
   
   /**
    * Returns the value as stored by the RegionEntry implementation.  For instance, if compressed this
@@ -240,6 +267,7 @@ public interface RegionEntry {
    *  
    * @since 8.0
    */
+  @Retained
   public Object getTransformedValue();
   
   /**
@@ -249,6 +277,7 @@ public interface RegionEntry {
    *
    * @see LocalRegion#getValueInVM
    */
+  @Retained
   public Object getValueInVM(RegionEntryContext context);
   /**
    * Returns the value of an entry as it resides on disk.  For
@@ -311,11 +340,12 @@ public interface RegionEntry {
    *   mark the destroy
    * @return true if destroy was done; false if not
    */
+  @Released
   public boolean destroy(LocalRegion region,
                          EntryEventImpl event,
                          boolean inTokenMode,
                          boolean cacheWrite,
-                         Object expectedOldValue,
+                         @Unretained Object expectedOldValue,
                          boolean forceDestroy,
                          boolean removeRecoveredEntry)
     throws CacheWriterException, EntryNotFoundException, TimeoutException, RegionClearedException;
@@ -347,6 +377,7 @@ public interface RegionEntry {
    * return a temporary copy. For SQLFabric this is used during table scans in
    * queries when faulting in every value will be only an unnecessary overhead.
    */
+  @Retained
   public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner);
 
   /**
@@ -356,6 +387,7 @@ public interface RegionEntry {
    * queries when faulting in every value will be only an unnecessary overhead.
    * The value returned will be kept off heap (and compressed) if possible.
    */
+  @Retained
   public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner);
 
   /**
@@ -374,7 +406,26 @@ public interface RegionEntry {
    * @param underUpdate
    */
   public void setUpdateInProgress(final boolean underUpdate);
-  
+
+  /**
+   * Returns true if this entry has been marked for eviction for custom eviction
+   * via {@link EvictionCriteria}.
+   */
+  public boolean isMarkedForEviction();
+
+  /**
+   * Marks this entry for eviction by custom eviction via
+   * {@link EvictionCriteria}.
+   */
+  public void setMarkedForEviction();
+
+  /**
+   * Clears this entry as for eviction by custom eviction via
+   * {@link EvictionCriteria} or when an update is done after it was marked for
+   * eviction.
+   */
+  public void clearMarkedForEviction();
+
   /**
    * Event containing this RegionEntry is being passed through
    * dispatchListenerEvent for CacheListeners under RegionEntry lock. This is
@@ -451,4 +502,9 @@ public interface RegionEntry {
    */
   public void resetRefCount(NewLRUClockHand lruList);
 
+  @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+  public Object prepareValueForCache(RegionEntryContext r, Object val, boolean isEntryUpdate);
+
+  @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+  public Object prepareValueForCache(RegionEntryContext r, Object val, EntryEventImpl event, boolean isEntryUpdate);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryContext.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryContext.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryContext.java
index a5b0c6e..ead5c10 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryContext.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryContext.java
@@ -25,4 +25,8 @@ public interface RegionEntryContext extends HasCachePerfStats {
    * @return null if no compressor is assigned or available for the entry.
    */
   public Compressor getCompressor();
+  /**
+   * Returns true if region entries are stored off heap.
+   */
+  public boolean getOffHeap();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryFactory.java
index 8e0a2c0..8071e0e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntryFactory.java
@@ -31,4 +31,11 @@ public interface RegionEntryFactory {
    * @return return the versioned equivalent of this RegionEntryFactory
    */
   public RegionEntryFactory makeVersioned();
+  
+  /**
+   * Return the equivalent on heap version of this entry factory. This
+   * is used for creating temporary region entries that shouldn't be stored
+   * off heap.
+   */
+  public RegionEntryFactory makeOnHeap();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
index 8dc938d..f850fb0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
@@ -97,6 +97,11 @@ public interface RegionMap extends LRUMapCallbacks {
    */
   public Collection<RegionEntry> regionEntries();
 
+  /**
+   * Returns a collection of RegionEntry instances from memory only.
+   */
+  public Collection<RegionEntry> regionEntriesInVM();
+
   public boolean containsKey(Object key);
 
   /**
@@ -105,6 +110,7 @@ public interface RegionMap extends LRUMapCallbacks {
    * @return the RegionEntry from memory or disk
    */
   public RegionEntry getEntry(Object key);
+  public RegionEntry putEntryIfAbsent(Object key, RegionEntry re);
 
   /**
    * fetches the entry from the backing ConcurrentHashMap. 
@@ -113,7 +119,16 @@ public interface RegionMap extends LRUMapCallbacks {
    */
   public RegionEntry getEntryInVM(Object key);
 
-//   /**
+  /**
+   * fetches the entry from the backing ConcurrentHashMap only if the entry
+   * is considered to be in operational data i.e. does not have
+   * isMarkedForEviction() bit set.
+   * @param key
+   * @return the RegionEntry in operational data
+   */
+  public RegionEntry getOperationalEntryInVM(Object key);
+
+  //   /**
 //    * Removes any entry associated with <code>key</code>.
 //    * Do nothing if the map has no entry for key.
 //    */
@@ -375,4 +390,6 @@ public interface RegionMap extends LRUMapCallbacks {
    * not modify an entry while it is referenced by a transaction.
    */
   public void decTxRefCount(RegionEntry e);
+
+  public void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
index 723c574..d479d83 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
@@ -31,6 +31,11 @@ class RegionMapFactory {
     //.getDataPolicy().withPartitioning());
     if (owner.isProxy() /*|| owner instanceof PartitionedRegion*/) { // TODO enabling this causes eviction tests to fail
       return new ProxyRegionMap(owner, attrs, internalRegionArgs);
+    } else if (internalRegionArgs.isReadWriteHDFSRegion()) {
+      if (owner.getEvictionController() == null) {
+        return new HDFSRegionMapImpl(owner, attrs, internalRegionArgs);
+      }
+      return new HDFSLRURegionMap(owner, attrs, internalRegionArgs);
     //else if (owner.getEvictionController() != null && isNotPartitionedRegion) {
     } else if (owner.getEvictionController() != null ) {
       return new VMLRURegionMap(owner, attrs,internalRegionArgs);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
index 9afd1ec..9850f91 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionQueue.java
@@ -144,4 +144,6 @@ public interface RegionQueue
 
   //TODO:Asif: Remove this method. Added this justto make it compilable
   public void remove(int top) throws CacheException;
+
+  public void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
index 90b3d86..23d0875 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteDestroyMessage.java
@@ -39,6 +39,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -46,6 +47,12 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_OBJECT;
 
 /**
  * A class that specifies a destroy operation.
@@ -59,7 +66,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
  * @since 6.5
  *  
  */
-public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply {
+public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply implements OldValueImporter {
   
   private static final Logger logger = LogService.getLogger();
   
@@ -100,6 +107,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
   
   private byte[] oldValBytes;
   
+  @Unretained(ENTRY_EVENT_OLD_VALUE)
   private transient Object oldValObj;
 
   boolean useOriginRemote;
@@ -146,28 +154,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
     // added for old value if available sent over the wire for bridge servers.
     if (event.hasOldValue()) {
       this.hasOldValue = true;
-      CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
-      if (cd != null) {
-        {
-          this.oldValueIsSerialized = true;
-          Object o = cd.getValue();
-          if (o instanceof byte[]) {
-            setOldValBytes((byte[])o);
-          } else {
-            // Defer serialization until toData is called.
-            setOldValObj(o);
-          }
-        }
-      } else {
-        Object old = event.getRawOldValue();
-        if (old instanceof byte[]) {
-          this.oldValueIsSerialized = false;
-          setOldValBytes((byte[]) old);
-        } else {
-          this.oldValueIsSerialized = true;
-          setOldValObj(old);
-        }
-      }
+      event.exportOldValue(this);
     }
     
   }
@@ -181,8 +168,8 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
   private void setOldValBytes(byte[] valBytes){
     this.oldValBytes = valBytes;
   }
-  
-  private void setOldValObj(Object o){
+
+  private final void setOldValObj(@Unretained(ENTRY_EVENT_OLD_VALUE) Object o) {
     this.oldValObj = o;
   }
   
@@ -214,7 +201,11 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
       this.hasOldValue = true;
       CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
       if (cd != null) {
-        {
+        if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
+          // it is a byte[]
+          this.oldValueIsSerialized = false;
+          setOldValBytes((byte[]) ((StoredObject) cd).getDeserializedForReading());
+        } else {
           this.oldValueIsSerialized = true;
           Object o = cd.getValue();
           if (o instanceof byte[]) {
@@ -327,6 +318,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
                                           event,
                                           expectedOldValue, processorType,
                                           useOriginRemote, possibleDuplicate);
+    m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
     Set failures =r.getDistributionManager().putOutgoing(m); 
     if (failures != null && failures.size() > 0 ) {
       throw new RemoteOperationException(LocalizedStrings.RemoteDestroyMessage_FAILED_SENDING_0.toLocalizedString(m));
@@ -359,8 +351,9 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
       ((KeyWithRegionContext)this.key).setRegionContext(r);
     }
     EntryEventImpl event = null;
+    try {
     if (this.bridgeContext != null) {
-      event = new EntryEventImpl(r, getOperation(), getKey(), null/*newValue*/,
+      event = EntryEventImpl.create(r, getOperation(), getKey(), null/*newValue*/,
           getCallbackArg(), false/*originRemote*/, eventSender, 
           true/*generateCallbacks*/);
       event.setContext(this.bridgeContext);
@@ -376,7 +369,7 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
       }
     } // bridgeContext != null
     else {
-      event = new EntryEventImpl(
+      event = EntryEventImpl.create(
         r,
         getOperation(),
         getKey(),
@@ -431,6 +424,11 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
             new ReplyException(e), getReplySender(dm), r.isInternalRegion());
       }
     return false;
+    } finally {
+      if (event != null) {
+        event.release();
+      }
+    }
   }
 
   public int getDSFID() {
@@ -488,16 +486,9 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
 
     // this will be on wire for cqs old value generations.
     if (this.hasOldValue){
-      //out.writeBoolean(this.hasOldValue);
-      // below boolean is not strictly required, but this is for compatibility
-      // with SQLFire code which writes as byte here to indicate whether
-      // oldValue is an object, serialized object or byte[]
       out.writeByte(this.oldValueIsSerialized ? 1 : 0);
-      if (getOldValueBytes() != null) {
-        DataSerializer.writeByteArray(getOldValueBytes(), out);
-      } else {
-        DataSerializer.writeObjectAsByteArray(getOldValObj(), out);
-      }
+      byte policy = DistributedCacheOperation.valueIsToDeserializationPolicy(oldValueIsSerialized);
+      DistributedCacheOperation.writeValue(policy, getOldValObj(), getOldValueBytes(), out);
     }
     DataSerializer.writeObject(this.expectedOldValue, out);
     DataSerializer.writeObject(this.versionTag, out);
@@ -577,6 +568,47 @@ public class RemoteDestroyMessage extends RemoteOperationMessageWithDirectReply
     return this.cbArg;
   }
   
+  @Override
+  public boolean prefersOldSerialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isUnretainedOldReferenceOk() {
+    return true;
+  }
+
+  @Override
+  public boolean isCachedDeserializableValueOk() {
+    return false;
+  }
+  
+  private void setOldValueIsSerialized(boolean isSerialized) {
+    if (isSerialized) {
+      if (CachedDeserializableFactory.preferObject()) {
+        this.oldValueIsSerialized = true; //VALUE_IS_OBJECT;
+      } else {
+        // Defer serialization until toData is called.
+        this.oldValueIsSerialized = true; //VALUE_IS_SERIALIZED_OBJECT;
+      }
+    } else {
+      this.oldValueIsSerialized = false; //VALUE_IS_BYTES;
+    }
+  }
+  
+  @Override
+  public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) {
+    setOldValueIsSerialized(isSerialized);
+    // Defer serialization until toData is called.
+    setOldValObj(ov);
+  }
+
+  @Override
+  public void importOldBytes(byte[] ov, boolean isSerialized) {
+    setOldValueIsSerialized(isSerialized);
+    setOldValBytes(ov);
+  }
+
   public static class DestroyReplyMessage extends ReplyMessage {
     
     private static final byte HAS_VERSION = 0x01;


Mime
View raw message