geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [14/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllGatewayHubsRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllGatewayHubsRequest.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllGatewayHubsRequest.java
new file mode 100644
index 0000000..6d98d5d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/ShutdownAllGatewayHubsRequest.java
@@ -0,0 +1,52 @@
+package com.gemstone.gemfire.internal.admin.remote;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+
+/**
+ * 
+ * This ShutdownAllGatewayHubsRequest just reply with ignored bit true so that
+ * old version member's request will be ignored and no exception will be thrown.
+ * 
+ * From 9.0 old wan support is removed. Ideally ShutdownAllGatewayHubsRequest
+ * should be removed but it it there for rolling upgrade support when request
+ * come from old version member to shut down hubs.
+ * 
+ * @author kbachhav
+ * @since 9.0
+ *
+ */
+public class ShutdownAllGatewayHubsRequest extends DistributionMessage {
+  
+  protected int rpid;
+
+  @Override
+  public int getDSFID() {
+    return SHUTDOWN_ALL_GATEWAYHUBS_REQUEST;
+  }
+
+  @Override
+  public int getProcessorType() {
+    return DistributionManager.STANDARD_EXECUTOR;
+  }
+
+  @Override
+  protected void process(DistributionManager dm) {
+    ReplyMessage.send(getSender(), this.rpid, null, dm, true /*ignored*/, false, false);
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.rpid = in.readInt();
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    out.writeInt(this.rpid);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index e3120f8..b394d10 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -14,6 +14,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
@@ -24,7 +25,11 @@ import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
 import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
@@ -33,9 +38,10 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa
 import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
 
 public abstract class AbstractBucketRegionQueue extends BucketRegion {
-  private static final Logger logger = LogService.getLogger();
+  protected static final Logger logger = LogService.getLogger();
   
   /**
     * The maximum size of this single queue before we start blocking puts
@@ -244,8 +250,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
             + key, rde);
       }
     } finally {
-      //merge42180: are we considering offheap in cedar. Comment freeOffHeapReference intentionally
-      //event.freeOffHeapReferences();
+      event.release();
     }
 
     this.notifyEntriesRemoved();
@@ -301,16 +306,16 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
                 logger.debug("For bucket {} , enqueing event {} caused exception", getId(), event, e);
               }
             } finally {
-              /*if (event != null) {
-                event.release();  // merge44873: this is offheap related change from cheetah
-              }*/
+              if (event != null) {
+                event.release();
+              }
             }
           }
           } finally {
             if (!tempQueue.isEmpty()) {
-              /*for (GatewaySenderEventImpl e: tempQueue) {
-                e.release(); // merge44873: this is offheap related change from cheetah
-              }*/
+              for (GatewaySenderEventImpl e: tempQueue) {
+                e.release();
+              }
               tempQueue.clear();
             }
             getInitializationLock().writeLock().unlock();
@@ -364,7 +369,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
       //if (ov instanceof GatewaySenderEventImpl) {
       //  ((GatewaySenderEventImpl)ov).release();
       //}
-     
+       GatewaySenderEventImpl.release(event.getRawOldValue());
     }
     return success;
     
@@ -378,6 +383,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     //if (rov instanceof GatewaySenderEventImpl) {
     //  ((GatewaySenderEventImpl) rov).release();
     //}
+	GatewaySenderEventImpl.release(event.getRawOldValue());
   }
 
 
@@ -429,21 +435,31 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     // is never stored offheap so this EntryEventImpl values will never be off-heap.
     // So the value that ends up being stored in this region is a GatewaySenderEventImpl
     // which may have a reference to a value stored off-heap.
-    EntryEventImpl event = new EntryEventImpl(this, Operation.UPDATE, key,
+    EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
         value, null, false, getMyId());
     // here avoiding unnecessary validations of key, value. Readniness check
     // will be handled in virtualPut. avoiding extractDelta as this will be new
     // entry everytime
     // EntryEventImpl event = getPartitionedRegion().newUpdateEntryEvent(key,
     // value, null);
-    //event.copyOffHeapToHeap();
+    event.copyOffHeapToHeap();
 
     if (logger.isDebugEnabled()) {
       logger.debug("Value : {}", event.getRawNewValue());
     }
     waitIfQueueFull();
-
+    
+    int sizeOfHdfsEvent = -1;
     try {
+      if (this instanceof HDFSBucketRegionQueue) {
+        // need to fetch the size before event is inserted in queue.
+        // fix for #50016
+        if (this.getBucketAdvisor().isPrimary()) {
+          HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+          sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+        }
+      }
+      
       didPut = virtualPut(event, false, false, null, false, startPut, true);
       
       checkReadiness();
@@ -454,10 +470,9 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
         throw new ForceReattemptException("Bucket moved", rde);
       }
     } finally {
-      //if (!didPut) {
-      //  GatewaySenderEventImpl gwVal = (GatewaySenderEventImpl) value;
-      //  gwVal.release();
-      //}
+      if (!didPut) {
+        GatewaySenderEventImpl.release(value);
+      }
     }
     
     //check again if the key exists in failedBatchRemovalMessageKeys, 
@@ -467,38 +482,38 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
       destroyKey(key);
       didPut = false;
     } else {
-      addToEventQueue(key, didPut, event);
+      addToEventQueue(key, didPut, event, sizeOfHdfsEvent);
     }
     return didPut;
   }
+  @Override
+  public void closeEntries() {
+    OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
+      @Override
+      public void run() {
+        AbstractBucketRegionQueue.super.closeEntries();
+      }
+    });
+    clearQueues();
+    
+  }
   
-//  @Override
-//  public void closeEntries() {
-//    OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
-//      @Override
-//      public void run() {
-//        AbstractBucketRegionQueue.super.closeEntries();
-//      }
-//    });
-//    clearQueues();
-//    
-//  }
-//  
-//  @Override
-//  public Set<VersionSource> clearEntries(final RegionVersionVector rvv) {
-//    final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>();
-//    OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
-//      @Override
-//      public void run() {
-//        result.set(AbstractBucketRegionQueue.super.clearEntries(rvv));
-//      }
-//    });
-//    clearQueues();
-//    return result.get();
-//  }
+  @Override
+  public Set<VersionSource> clearEntries(final RegionVersionVector rvv) {
+    final AtomicReference<Set<VersionSource>> result = new AtomicReference<Set<VersionSource>>();
+    OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() {
+      @Override
+      public void run() {
+        result.set(AbstractBucketRegionQueue.super.clearEntries(rvv));
+      }
+    });
+    clearQueues();
+    return result.get();
+  }
   
   protected abstract void clearQueues();
-  protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event);
+  protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, 
+      int sizeOfHdfsEvent);
   
   @Override
   public void afterAcquiringPrimaryState() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java
index a237b7c..a10d503 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java
@@ -9,6 +9,8 @@ package com.gemstone.gemfire.internal.cache;
 
 import java.io.PrintStream;
 import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -32,6 +34,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
 import joptsimple.internal.Strings;
 
 /**
@@ -77,6 +80,8 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
   private String compressorClassName;
   private Compressor compressor;
 
+  private boolean offHeap;
+  
   /**
    * Records the version vector of what has been persisted to disk.
    * This may lag behind the version vector of what is in memory, because
@@ -137,6 +142,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
       this.versionVector = drv.getRegionVersionVector();
       this.compressorClassName = drv.getCompressorClassName();
       this.compressor = drv.getCompressor();
+      this.offHeap = drv.getOffHeap();
       if (drv instanceof PlaceHolderDiskRegion) {
         this.setRVVTrusted(((PlaceHolderDiskRegion) drv).getRVVTrusted());
       }
@@ -218,6 +224,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.versionVector = drv.getRegionVersionVector();
     this.compressorClassName = drv.getCompressorClassName();
     this.compressor = drv.getCompressor();
+    this.offHeap = drv.getOffHeap();
   }
   
   //////////////////////  Instance Methods  //////////////////////
@@ -257,7 +264,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
                         float loadFactor, boolean statisticsEnabled,
                         boolean isBucket, EnumSet<DiskRegionFlag> flags,
                         String partitionName, int startingBucketId, 
-                        String compressorClassName) {
+                        String compressorClassName, boolean offHeap) {
     this.lruAlgorithm = lruAlgorithm;
     this.lruAction = lruAction;
     this.lruLimit = lruLimit;
@@ -273,6 +280,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     this.partitionName = partitionName;
     this.startingBucketId = startingBucketId;
     this.compressorClassName = compressorClassName;
+    this.offHeap = offHeap;
     if (!ds.isOffline()) {
       createCompressorFromClassName();
     }
@@ -519,6 +527,21 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     if(isReadyForRecovery()) {
       ds.updateDiskRegion(this);
       this.entriesMapIncompatible = false;
+      if (this.entries != null) {
+        CustomEntryConcurrentHashMap<Object, Object> other = ((AbstractRegionMap)this.entries)._getMap();
+        Iterator<Map.Entry<Object, Object>> it = other
+            .entrySetWithReusableEntries().iterator();
+        while (it.hasNext()) {
+          Map.Entry<Object, Object> me = it.next();
+          RegionEntry oldRe = (RegionEntry)me.getValue();
+          if (oldRe instanceof OffHeapRegionEntry) {
+            ((OffHeapRegionEntry) oldRe).release();
+          } else {
+            // no need to keep iterating; they are all either off heap or on heap.
+            break;
+          }
+        }
+      }
       this.entries = null;
       this.readyForRecovery = false;
     }
@@ -746,6 +769,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     msg += " -concurrencyLevel=" + getConcurrencyLevel()
       + " -initialCapacity=" + getInitialCapacity()
       + " -loadFactor=" + getLoadFactor()
+      + " -offHeap=" + getOffHeap()
       + " -compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName())
       + " -statisticsEnabled=" + getStatisticsEnabled();
     if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY)) {
@@ -787,6 +811,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     sb.append("-concurrencyLevel=" + getConcurrencyLevel()); sb.append(lineSeparator);
     sb.append("-initialCapacity=" + getInitialCapacity()); sb.append(lineSeparator);
     sb.append("-loadFactor=" + getLoadFactor()); sb.append(lineSeparator);
+    sb.append("-offHeap=" + getOffHeap()); sb.append(lineSeparator);
     sb.append("-compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName())); sb.append(lineSeparator);
     sb.append("-statisticsEnabled=" + getStatisticsEnabled()); sb.append(lineSeparator);
     
@@ -860,6 +885,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     msg.append("\n\tconcurrencyLevel=").append(getConcurrencyLevel());
     msg.append("\n\tinitialCapacity=").append(getInitialCapacity());
     msg.append("\n\tloadFactor=").append(getLoadFactor());
+    msg.append("\n\toffHeap=").append(getOffHeap());
     msg.append("\n\tstatisticsEnabled=").append(getStatisticsEnabled());
     
     msg.append("\n\tdrId=").append(getId());
@@ -936,6 +962,11 @@ public abstract class AbstractDiskRegion implements DiskRegionView {
     return this.compressor;
   }
   
+  @Override
+  public boolean getOffHeap() {
+    return this.offHeap;
+  }
+
   public CachePerfStats getCachePerfStats() {
     return this.ds.getCache().getCachePerfStats();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java
index 7d48082..5d6ce4d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java
@@ -6,6 +6,11 @@
  *=========================================================================
  */
 package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 /**
  * 
  * @author sbawaska
@@ -38,9 +43,25 @@ public abstract class AbstractDiskRegionEntry
   @Override
   public void setValueWithContext(RegionEntryContext context, Object value) {
     _setValue(value);
-    //_setValue(compress(context,value));  // compress is now called in AbstractRegionMap.prepareValueForCache
+    if (value != null && context != null && (this instanceof OffHeapRegionEntry) 
+        && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
+      ((OffHeapRegionEntry)this).release();
+      ((LocalRegion)context).checkReadiness();
+    }
   }
   
   // Do not add any instances fields to this class.
   // Instead add them to the DISK section of LeafRegionEntry.cpp.
+
+  @Override
+  public void handleValueOverflow(RegionEntryContext context) {
+    if (context instanceof BucketRegionQueue || context instanceof SerialGatewaySenderQueue.SerialGatewaySenderQueueMetaRegion) {
+      GatewaySenderEventImpl.release(this._getValue()); // OFFHEAP _getValue ok
+    }
+  }
+  @Override
+  public void afterValueOverflow(RegionEntryContext context) {
+    //NO OP
+    //Overridden in sqlf RegionEntry
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
index e4fb873..c3d99b8 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
@@ -19,6 +19,7 @@ import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAlgorithm;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
 import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
 import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
 import com.gemstone.gemfire.internal.cache.lru.HeapLRUCapacityController;
@@ -34,6 +35,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer;
 
 /**
@@ -70,8 +72,9 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
       ea = ((LocalRegion)owner).getEvictionAttributes().getAlgorithm();
       ec = ((LocalRegion)owner).getEvictionController();
     } else if (owner instanceof PlaceHolderDiskRegion) {
-      ea = ((PlaceHolderDiskRegion)owner).getActualLruAlgorithm();
-      ec = ((PlaceHolderDiskRegion)owner).getEvictionAttributes().createEvictionController(null);
+      PlaceHolderDiskRegion phdr = (PlaceHolderDiskRegion)owner;
+      ea = phdr.getActualLruAlgorithm();
+      ec = phdr.getEvictionAttributes().createEvictionController(null, phdr.getOffHeap());
     } else {
       throw new IllegalStateException("expected LocalRegion or PlaceHolderDiskRegion");
     }
@@ -166,9 +169,15 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
     // make sure this cached deserializable is still in the entry
     // @todo what if a clear is done and this entry is no longer in the region?
     {
-      Object curVal = le._getValue();
+      Object curVal = le._getValue(); // OFFHEAP: _getValue ok
       if (curVal != cd) {
+        if (cd instanceof StoredObject) {
+          if (!cd.equals(curVal)) {
+            return false;
+          }
+        } else {
           return false;
+        }
       }
     }
     // TODO:KIRK:OK if (le.getValueInVM((RegionEntryContext) _getOwnerObject()) != cd) return false;
@@ -536,6 +545,20 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
     // reset the tx thread local
  } 
   
+  private boolean mustEvict() {
+    LocalRegion owner = _getOwner();
+    InternalResourceManager resourceManager = owner.getCache().getResourceManager();
+    
+    final boolean monitorStateIsEviction;
+    if (!owner.getAttributes().getOffHeap()) {
+      monitorStateIsEviction = resourceManager.getHeapMonitor().getState().isEviction();
+    } else {
+      monitorStateIsEviction = resourceManager.getOffHeapMonitor().getState().isEviction();
+    }
+    
+    return monitorStateIsEviction && this.sizeInVM() > 0;
+  }
+  
   public final int centralizedLruUpdateCallback() {
     final boolean isDebugEnabled_LRU = logger.isTraceEnabled(LogMarker.LRU);
     
@@ -550,8 +573,7 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
     }
     LRUStatistics stats = _getLruList().stats();
     try {
-      while (_getOwner().getCache().getHeapEvictor().mustEvict()
-          && this.sizeInVM() > 0 && evictedBytes == 0) {
+      while (mustEvict() && evictedBytes == 0) {
         LRUEntry removalEntry = (LRUEntry)_getLruList().getLRUEntry();
         if (removalEntry != null) {
           evictedBytes = evictEntry(removalEntry, stats);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java
index 7e39f8c..3a31e90 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java
@@ -13,6 +13,7 @@ import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 
 /**
  * Abstract implementation class of RegionEntry interface.
@@ -67,9 +68,16 @@ public abstract class AbstractOplogDiskRegionEntry
   }
   
   @Override
+  @Retained
+  public final Object getValueRetain(RegionEntryContext context) {   
+    return Helper.faultInValueRetain(this, (LocalRegion) context);
+  }
+  
+  @Override
   public final Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) {
     return Helper.getValueInVMOrDiskWithoutFaultIn(this, owner);
   }
+  @Retained
   @Override
   public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) {
     return Helper.getValueOffHeapOrDiskWithoutFaultIn(this, owner);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index b9b7b26..1dcd918 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.logging.log4j.Logger;
@@ -36,6 +37,7 @@ import com.gemstone.gemfire.cache.CacheLoaderException;
 import com.gemstone.gemfire.cache.CacheStatistics;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.CustomExpiry;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.DiskWriteAttributes;
@@ -43,6 +45,7 @@ import com.gemstone.gemfire.cache.EntryExistsException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.EvictionAttributes;
 import com.gemstone.gemfire.cache.EvictionAttributesMutator;
+import com.gemstone.gemfire.cache.EvictionCriteria;
 import com.gemstone.gemfire.cache.ExpirationAction;
 import com.gemstone.gemfire.cache.ExpirationAttributes;
 import com.gemstone.gemfire.cache.MembershipAttributes;
@@ -91,6 +94,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.util.ArrayUtils;
 import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
+import com.google.common.util.concurrent.Service.State;
 
 /**
  * Takes care of RegionAttributes, AttributesMutator, and some no-brainer method
@@ -195,6 +199,12 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
 
   protected boolean enableAsyncConflation;
 
+  /**
+   * True if this region uses off-heap memory; otherwise false (default)
+   * @since 9.0
+   */
+  protected boolean offHeap;
+
   protected boolean cloningEnable = false;
 
   protected DiskWriteAttributes diskWriteAttributes;
@@ -221,6 +231,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
 
   protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl();
 
+  protected CustomEvictionAttributes customEvictionAttributes;
+
   /** The membership attributes defining required roles functionality */
   protected MembershipAttributes membershipAttributes;
 
@@ -243,6 +255,10 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
   
   protected String poolName;
   
+  protected String hdfsStoreName;
+  
+  protected boolean hdfsWriteOnly;
+  
   protected Compressor compressor;
   
   /**
@@ -883,6 +899,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     return this.subscriptionAttributes;
   }
   
+  @Override
+  public final String getHDFSStoreName() {
+    return this.hdfsStoreName;
+  }
+  
+  @Override
+  public final boolean getHDFSWriteOnly() {
+    return this.hdfsWriteOnly;
+  }
+  
   /**
    * Get IndexManger for region
    */
@@ -1700,6 +1726,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     this.dataPolicy = attrs.getDataPolicy(); // do this one first
     this.scope = attrs.getScope();
     
+    this.offHeap = attrs.getOffHeap();
+
+    // fix bug #52033 by invoking setOffHeap now (localMaxMemory may now be the temporary placeholder for off-heap until DistributedSystem is created
+    // found non-null PartitionAttributes and offHeap is true so let's setOffHeap on PA now
+    PartitionAttributes<?, ?> pa = attrs.getPartitionAttributes();
+    if (this.offHeap && pa != null) {
+      PartitionAttributesImpl impl = (PartitionAttributesImpl)pa;
+      impl.setOffHeap(this.offHeap);
+    }
+
     this.evictionAttributes = new EvictionAttributesImpl((EvictionAttributesImpl)attrs
         .getEvictionAttributes());
     if (attrs.getPartitionAttributes() != null
@@ -1721,8 +1757,9 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     if (this.evictionAttributes != null
         && !this.evictionAttributes.getAlgorithm().isNone()) {
       this.setEvictionController(this.evictionAttributes
-          .createEvictionController(this));
+          .createEvictionController(this, attrs.getOffHeap()));
     }
+    this.customEvictionAttributes = attrs.getCustomEvictionAttributes();
     storeCacheListenersField(attrs.getCacheListeners());
     assignCacheLoader(attrs.getCacheLoader());
     assignCacheWriter(attrs.getCacheWriter());
@@ -1781,6 +1818,9 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
             + "when multiuser-authentication is true.");
       }
     }
+    this.hdfsStoreName = attrs.getHDFSStoreName();
+    this.hdfsWriteOnly = attrs.getHDFSWriteOnly();
+
     this.diskStoreName = attrs.getDiskStoreName();
     this.isDiskSynchronous = attrs.isDiskSynchronous();
     if (this.diskStoreName == null) {
@@ -1845,11 +1885,52 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes getCustomEvictionAttributes() {
+    return this.customEvictionAttributes;
+  }
+
   public EvictionAttributesMutator getEvictionAttributesMutator()
   {
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes setCustomEvictionAttributes(long newStart,
+      long newInterval) {
+    checkReadiness();
+
+    if (this.customEvictionAttributes == null) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET
+              .toLocalizedString(getFullPath()));
+    }
+
+    if (newStart == 0) {
+      newStart = this.customEvictionAttributes.getEvictorStartTime();
+    }
+    this.customEvictionAttributes = new CustomEvictionAttributesImpl(
+        this.customEvictionAttributes.getCriteria(), newStart, newInterval,
+        newStart == 0 && newInterval == 0);
+
+//    if (this.evService == null) {
+//      initilializeCustomEvictor();
+//    } else {// we are changing the earlier one which is already started.
+//      EvictorService service = getEvictorTask();
+//      service.changeEvictionInterval(newInterval);
+//      if (newStart != 0)
+//        service.changeStartTime(newStart);
+//    }
+
+    return this.customEvictionAttributes;
+  }
+  
   public void setEvictionController(LRUAlgorithm evictionController)
   {
     this.evictionController = evictionController;
@@ -1987,10 +2068,98 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
   }
   
   /**
-   * @since 8.1
-   */
+  * @since 8.1
+  * property used to find region operations that reach out to HDFS multiple times
+  */
   @Override
   public ExtensionPoint<Region<?, ?>> getExtensionPoint() {
     return extensionPoint;
   }
+
+  public boolean getOffHeap() {
+    return this.offHeap;
+  }
+  /**
+   * property used to find region operations that reach out to HDFS multiple times
+   */
+  private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls");
+
+  /**
+   * throws exception if region operation goes out to HDFS multiple times
+   */
+  private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls");
+
+  private ThreadLocal<CallLog> logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal<CallLog>() : null;
+
+  public void hdfsCalled(Object key) {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    logHDFSCalls.get().addStack(new Throwable());
+    logHDFSCalls.get().setKey(key);
+  }
+  public final void operationStart() {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    if (logHDFSCalls.get() == null) {
+      logHDFSCalls.set(new CallLog());
+      //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable());
+    } else {
+      logHDFSCalls.get().incNestedCall();
+      //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable());
+    }
+  }
+  public final void operationCompleted() {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable());
+    if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) {
+      logHDFSCalls.get().assertCalls();
+      logHDFSCalls.set(null);
+    }
+  }
+
+  public static class CallLog {
+    private List<Throwable> stackTraces = new ArrayList<Throwable>();
+    private Object key;
+    private int nestedCall = 0;
+    public void incNestedCall() {
+      nestedCall++;
+    }
+    public int decNestedCall() {
+      return --nestedCall;
+    }
+    public void addStack(Throwable stack) {
+      this.stackTraces.add(stack);
+    }
+    public void setKey(Object key) {
+      this.key = key;
+    }
+    public void assertCalls() {
+      if (stackTraces.size() > 1) {
+        Throwable firstTrace = new Throwable();
+        Throwable lastTrace = firstTrace;
+        for (Throwable t : this.stackTraces) {
+          lastTrace.initCause(t);
+          lastTrace = t;
+        }
+        if (THROW_ON_MULTIPLE_HDFS_CALLS) {
+          throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+        } else {
+          InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+        }
+      }
+    }
+  }
+
+  public EvictionCriteria getEvictionCriteria() {
+    EvictionCriteria criteria = null;
+    if (this.customEvictionAttributes != null
+        && !this.customEvictionAttributes.isEvictIncoming()) {
+      criteria = this.customEvictionAttributes.getCriteria();
+    }
+    return criteria;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index d7f3963..809996b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -9,7 +9,11 @@
 package com.gemstone.gemfire.internal.cache;
 
 import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.logging.log4j.Logger;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE;
 
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.InvalidDeltaException;
@@ -41,11 +45,24 @@ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
+import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkType;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunk;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunkType;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.internal.util.Versionable;
 import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
@@ -54,7 +71,8 @@ import com.gemstone.gemfire.pdx.PdxInstance;
 import com.gemstone.gemfire.pdx.PdxSerializable;
 import com.gemstone.gemfire.pdx.PdxSerializationException;
 import com.gemstone.gemfire.pdx.PdxSerializer;
-
+import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
+import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
 
 /**
  * Abstract implementation class of RegionEntry interface.
@@ -100,10 +118,13 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   protected static final long IN_USE_BY_TX = 0x40L<<56;
 
 
+  protected static final long MARKED_FOR_EVICTION = 0x80L<<56;
 //  public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut()
   
-  protected AbstractRegionEntry(RegionEntryContext context, Object value) {
-    setValue(context,AbstractRegionMap.prepareValueForCache(context, value),false);
+  protected AbstractRegionEntry(RegionEntryContext context,
+      @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) {
+    
+    setValue(context,this.prepareValueForCache(context, value, false),false);
 //    setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 to know this is a new entry in checkForConflicts
   }
   
@@ -137,7 +158,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       return true;
 
     } finally {
-      if (isRemoved() && !isTombstone()) {
+      if (isRemoved() && !isTombstone() && !event.isEvicted()) {
         // Phase 2 of region entry removal is done here. The first phase is done
         // by the RegionMap. It is unclear why this code is needed. ARM destroy
         // does this also and we are now doing it as phase3 of the ARM destroy.
@@ -226,7 +247,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   
 
   @Override
-  public void setValueWithTombstoneCheck(Object v, EntryEvent e) throws RegionClearedException {
+  public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException {
     if (v == Token.TOMBSTONE) {
       makeTombstone((LocalRegion)e.getRegion(), ((EntryEventImpl)e).getVersionTag());
     } else {
@@ -266,15 +287,14 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     return getValueAsToken() == Token.REMOVED_PHASE2;
   }
   
-  // RegionEntry.fillInValue(...)
   public boolean fillInValue(LocalRegion region,
-                             InitialImageOperation.Entry dst,
+                             @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst,
                              ByteArrayDataInput in,
                              DM mgr)
   {
     dst.setSerialized(false); // starting default value
 
-    final Object v;
+    @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v;
     if (isTombstone()) {
       v = Token.TOMBSTONE;
     } else {
@@ -300,46 +320,59 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     }
     else if (v instanceof CachedDeserializable) {
       // don't serialize here if it is not already serialized
-      {
-        Object tmp = ((CachedDeserializable)v).getValue();
-        if (tmp instanceof byte[]) {
-          byte[] bb = (byte[])tmp;
-          dst.value = bb;
-        }
-        else if (isEagerDeserialize && tmp instanceof byte[][]) {
-          // optimize for byte[][] since it will need to be eagerly deserialized
-          // for SQLFabric
-          dst.value = tmp;
-          dst.setEagerDeserialize();
-        }
-        else {
-          try {
-            HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
-            BlobHelper.serializeTo(tmp, hdos);
-            hdos.trim();
-            dst.value = hdos;
-          } catch (IOException e) {
-            RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
-            e2.initCause(e);
-            throw e2;
+//      if(v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
+//        // For SQLFire we prefer eager deserialized
+//        dst.setEagerDeserialize();         
+//      }
+      
+      if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
+        dst.value = ((StoredObject) v).getDeserializedForReading();
+      } else {
+        /*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
+          dst.value = v;
+        } else */ {
+          Object tmp = ((CachedDeserializable) v).getValue();
+          if (tmp instanceof byte[]) {
+            byte[] bb = (byte[]) tmp;
+            dst.value = bb;
+          } else {
+            try {
+              HeapDataOutputStream hdos = new HeapDataOutputStream(
+                  Version.CURRENT);
+              BlobHelper.serializeTo(tmp, hdos);
+              hdos.trim();
+              dst.value = hdos;
+            } catch (IOException e) {
+              RuntimeException e2 = new IllegalArgumentException(
+                  LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING
+                      .toLocalizedString());
+              e2.initCause(e);
+              throw e2;
+            }
           }
+          dst.setSerialized(true);
         }
-        dst.setSerialized(true);
       }
     }
     else if (v instanceof byte[]) {
       dst.value = v;
     }
-    else if (isEagerDeserialize && v instanceof byte[][]) {
-      // optimize for byte[][] since it will need to be eagerly deserialized
-      // for SQLFabric
-      dst.value = v;
+    else { 
+      Object preparedValue = v;
+      if (preparedValue != null) {
+        preparedValue = prepareValueForGII(preparedValue);
+        if (preparedValue == null) {
+          return false;
+        }
+      }
+    if (CachedDeserializableFactory.preferObject()) {
+      dst.value = preparedValue;
       dst.setEagerDeserialize();
     }
     else {
       try {
         HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
-        BlobHelper.serializeTo(v, hdos);
+        BlobHelper.serializeTo(preparedValue, hdos);
         hdos.trim();
         dst.value = hdos;
         dst.setSerialized(true);
@@ -349,16 +382,32 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         throw e2;
       }
     }
+    }
     return true;
   }
   
+  /**
+   * To fix bug 49901 if v is a GatewaySenderEventImpl then make
+   * a heap copy of it if it is offheap.
+   * @return the value to provide to the gii request; null if no value should be provided.
+   */
+  public static Object prepareValueForGII(Object v) {
+    assert v != null;
+    if (v instanceof GatewaySenderEventImpl) {
+      return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap();
+    } else {
+      return v;
+    }
+  }
+  
   public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) {
     return false;
   }
 
   @Override
   public Object getValue(RegionEntryContext context) {
-    Object result = _getValueUse(context, true);
+    SimpleMemoryAllocatorImpl.createReferenceCountOwner();
+    @Retained Object result = _getValueRetain(context, true);
     //Asif: If the thread is an Index Creation Thread & the value obtained is 
     //Token.REMOVED , we can skip  synchronization block. This is required to prevent
     // the dead lock caused if an Index Update Thread has gone into a wait holding the
@@ -373,6 +422,21 @@ public abstract class AbstractRegionEntry implements RegionEntry,
 //    }
     
     if (Token.isRemoved(result)) {
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+      return null;
+    } else {
+      result = OffHeapHelper.copyAndReleaseIfNeeded(result); // sqlf does not dec ref count in this call
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+      setRecentlyUsed();
+      return result;
+    }
+  }
+  
+  @Override
+  @Retained
+  public Object getValueRetain(RegionEntryContext context) {
+    @Retained Object result = _getValueRetain(context, true);
+    if (Token.isRemoved(result)) {
       return null;
     } else {
       setRecentlyUsed();
@@ -381,7 +445,8 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   }
   
   @Override
-  public void setValue(RegionEntryContext context, Object value) throws RegionClearedException {
+  @Released
+  public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException {
     // @todo darrel: This will mark new entries as being recently used
     // It might be better to only mark them when they are modified.
     // Or should we only mark them on reads?
@@ -393,10 +458,14 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     setValue(context,value);
   }
   
-  protected void setValue(RegionEntryContext context, Object value, boolean recentlyUsed) {
-    // value = compress(context,value); // compress is now called in AbstractRegionMap.prepareValueForCache
-    
+  @Released
+  protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) {
     _setValue(value);
+    if (value != null && context != null && (this instanceof OffHeapRegionEntry) 
+        && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
+      ((OffHeapRegionEntry)this).release();
+      ((LocalRegion)context).checkReadiness();
+    }
     if (recentlyUsed) {
       setRecentlyUsed();
     }
@@ -473,21 +542,48 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     return value;    
   }
   
+  private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) {
+    byte[] result = uncompressedBytes;
+    if (isCompressible(context, uncompressedBytes)) {
+      long time = context.getCachePerfStats().startCompression();
+      result = context.getCompressor().compress(uncompressedBytes);
+      context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length);
+    }
+    return result;
+  }
+  
+  
+  @Retained
   public final Object getValueInVM(RegionEntryContext context) {
-    Object v = _getValueUse(context, true);
+    SimpleMemoryAllocatorImpl.createReferenceCountOwner();
+    @Retained Object v = _getValueRetain(context, true);
     
     if (v == null) { // should only be possible if disk entry
       v = Token.NOT_AVAILABLE;
     }
-    return v;
+    @Retained Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); // TODO OFFHEAP keep it offheap?
+    SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+    return result;
   }
   
+  @Retained
   public  Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) {
    return getValueInVM(owner);
   }
+  
   @Override
+  @Retained
   public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) {
-    return _getValueUse(owner, true);
+    @Retained Object result = _getValueRetain(owner, true);
+//    if (result instanceof ByteSource) {
+//      // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
+//      Object deserVal = ((CachedDeserializable)result).getDeserializedForReading();
+//      if (deserVal != result) {
+//        OffHeapHelper.release(result);
+//        result = deserVal;
+//      }
+//    }
+    return result;
   }
   
   public Object getValueOnDisk(LocalRegion r)
@@ -542,7 +638,8 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         // Because the pr meta data region will not have an LRU.
         newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null);
         if (!create && newValueToWrite instanceof Versionable) {
-          final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized
+          @Retained @Released final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized
+          try {
           // BUGFIX for 35029. If oldValue is null the newValue should be put.
           if(oldValue == null) {
           	putValue = true;
@@ -552,6 +649,9 @@ public abstract class AbstractRegionEntry implements RegionEntry,
             Versionable ov = (Versionable) oldValue;
             putValue = nv.isNewerThan(ov);
           }  
+          } finally {
+            OffHeapHelper.release(oldValue);
+          }
         }
       }
 
@@ -593,7 +693,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
             }
           }
         } 
-        setValue(region, AbstractRegionMap.prepareValueForCache(region, newValueToWrite));
+        setValue(region, this.prepareValueForCache(region, newValueToWrite, false));
         result = true;
 
         if (newValueToWrite != Token.TOMBSTONE){
@@ -621,11 +721,12 @@ public abstract class AbstractRegionEntry implements RegionEntry,
    * @throws EntryNotFoundException if expectedOldValue is
    * not null and is not equal to current value
    */
+  @Released
   public final boolean destroy(LocalRegion region,
                             EntryEventImpl event,
                             boolean inTokenMode,
                             boolean cacheWrite,
-                            Object expectedOldValue,
+                            @Unretained Object expectedOldValue,
                             boolean forceDestroy,
                             boolean removeRecoveredEntry)
     throws CacheWriterException,
@@ -648,7 +749,10 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     // :ezoerner:20080814 We also read old value from disk or buffer
     // in the case where there is a non-null expectedOldValue
     // see PartitionedRegion#remove(Object key, Object value)
-    Object curValue = _getValueUse(region, true);
+    SimpleMemoryAllocatorImpl.skipRefCountTracking();
+    @Retained @Released Object curValue = _getValueRetain(region, true);
+    SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+    try {
     if (curValue == null) curValue = Token.NOT_AVAILABLE;
     
     if (curValue == Token.NOT_AVAILABLE) {
@@ -675,7 +779,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     }
 
     if (expectedOldValue != null) {
-      if (!checkExpectedOldValue(expectedOldValue, curValue)) {
+      if (!checkExpectedOldValue(expectedOldValue, curValue, region)) {
         throw new EntryNotFoundException(
           LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
       }
@@ -685,12 +789,15 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       proceed = true;
     }
     else {
-      proceed = event.setOldValue(curValue) || removeRecoveredEntry
+      proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) || removeRecoveredEntry
                 || forceDestroy || region.getConcurrencyChecksEnabled() // fix for bug #47868 - create a tombstone
                 || (event.getOperation() == Operation.REMOVE // fix for bug #42242
                     && (curValue == null || curValue == Token.LOCAL_INVALID
                         || curValue == Token.INVALID));
     }
+    } finally {
+      OffHeapHelper.releaseWithNoTracking(curValue);
+    }
     } // end curValue block
     
     if (proceed) {
@@ -718,7 +825,16 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         if (indexManager != null) {
           try {
             if(isValueNull()) {
-              _setValue(AbstractRegionMap.prepareValueForCache(region, getValueInVMOrDiskWithoutFaultIn(region)));
+              @Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region);
+              try {
+              _setValue(prepareValueForCache(region, value, false));
+              if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) {
+                ((OffHeapRegionEntry)this).release();
+                region.checkReadiness();
+              }
+              } finally {
+                OffHeapHelper.release(value);
+              }
             }
             indexManager.updateIndexes(this,
                 IndexManager.REMOVE_ENTRY,
@@ -750,9 +866,17 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         removeEntry = true;
       }
 
-      if (removeEntry) {
+      // See #47887, we do not insert a tombstone for evicted HDFS
+      // entries since the value is still present in HDFS
+      // Check if we have to evict or just do destroy.
+      boolean forceRemoveEntry = 
+          (event.isEviction() || event.isExpiration()) 
+          && event.getRegion().isUsedForPartitionedRegionBucket()
+          && event.getRegion().getPartitionedRegion().isHDFSRegion();
+
+      if (removeEntry || forceRemoveEntry) {
         boolean isThisTombstone = isTombstone();
-        if(inTokenMode) {
+        if(inTokenMode && !event.getOperation().isEviction()) {
           setValue(region, Token.DESTROYED);  
         } else {
           removePhase1(region, false);
@@ -770,35 +894,161 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       return false;
     }
   }
+  
+ 
 
-  static boolean checkExpectedOldValue(Object expectedOldValue, Object actualValue) {
+  static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) {
     if (Token.isInvalid(expectedOldValue)) {
       return (actualValue == null) || Token.isInvalid(actualValue);
     } else {
-      return checkEquals(expectedOldValue, actualValue);
+      boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null;
+      return checkEquals(expectedOldValue, actualValue, isCompressedOffHeap);
+    }
+  }
+  
+  private static boolean basicEquals(Object v1, Object v2) {
+    if (v2 != null) {
+      if (v2.getClass().isArray()) {
+        // fix for 52093
+        if (v2 instanceof byte[]) {
+          if (v1 instanceof byte[]) {
+            return Arrays.equals((byte[])v2, (byte[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof Object[]) {
+          if (v1 instanceof Object[]) {
+            return Arrays.deepEquals((Object[])v2, (Object[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof int[]) {
+          if (v1 instanceof int[]) {
+            return Arrays.equals((int[])v2, (int[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof long[]) {
+          if (v1 instanceof long[]) {
+            return Arrays.equals((long[])v2, (long[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof boolean[]) {
+          if (v1 instanceof boolean[]) {
+            return Arrays.equals((boolean[])v2, (boolean[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof short[]) {
+          if (v1 instanceof short[]) {
+            return Arrays.equals((short[])v2, (short[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof char[]) {
+          if (v1 instanceof char[]) {
+            return Arrays.equals((char[])v2, (char[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof float[]) {
+          if (v1 instanceof float[]) {
+            return Arrays.equals((float[])v2, (float[])v1);
+          } else {
+            return false;
+          }
+        } else if (v2 instanceof double[]) {
+          if (v1 instanceof double[]) {
+            return Arrays.equals((double[])v2, (double[])v1);
+          } else {
+            return false;
+          }
+        }
+        // fall through and call equals method
+      }
+      return v2.equals(v1);
+    } else {
+      return v1 == null;
     }
   }
   
-  static boolean checkEquals(Object v1, Object v2) {
+  static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) {
     // need to give PdxInstance#equals priority
     if (v1 instanceof PdxInstance) {
       return checkPdxEquals((PdxInstance)v1, v2);
     } else if (v2 instanceof PdxInstance) {
       return checkPdxEquals((PdxInstance)v2, v1);
+    } else if (v1 instanceof OffHeapCachedDeserializable) {
+      return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2);
+    } else if (v2 instanceof OffHeapCachedDeserializable) {
+      return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1);
     } else if (v1 instanceof CachedDeserializable) {
-      return checkCDEquals((CachedDeserializable)v1, v2);
+      return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap);
     } else if (v2 instanceof CachedDeserializable) {
-      return checkCDEquals((CachedDeserializable)v2, v1);
+      return checkCDEquals((CachedDeserializable)v2, v1, isCompressedOffHeap);
     } else {
-      if (v2 != null) {
-        return v2.equals(v1);
+      return basicEquals(v1, v2);
+    }
+  }
+  private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) {
+    if (cd.isSerializedPdxInstance()) {
+      PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality"));
+      return checkPdxEquals(pi, obj);
+    }
+    if (obj instanceof OffHeapCachedDeserializable) {
+      return cd.checkDataEquals((OffHeapCachedDeserializable)obj);
+    } else {
+      byte[] serializedObj;
+      if (obj instanceof CachedDeserializable) {
+        if (!cd.isSerialized()) {
+          if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
+            // both are byte[]
+            // obj must be DataAsAddress since it was not OffHeapCachedDeserializable
+            // so its byte[] will be small.
+            byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading();
+            return cd.checkDataEquals(objBytes);
+          } else {
+            return false;
+          }
+        }
+        serializedObj = ((CachedDeserializable) obj).getSerializedValue();
+      } else if (obj instanceof byte[]) {
+        if (cd.isSerialized()) {
+          return false;
+        }
+        serializedObj = (byte[]) obj;
       } else {
-        return v1 == null;
+        if (!cd.isSerialized()) {
+          return false;
+        }
+        if (obj == null || obj == Token.NOT_AVAILABLE
+            || Token.isInvalidOrRemoved(obj)) {
+          return false;
+        }
+        serializedObj = EntryEventImpl.serialize(obj);
       }
+      return cd.checkDataEquals(serializedObj);
     }
   }
   
-  private static boolean checkCDEquals(CachedDeserializable cd, Object obj) {
+  private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) {
+    if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
+      // cd is an actual byte[].
+      byte[] ba2;
+      if (obj instanceof StoredObject) {
+        if (!((StoredObject) obj).isSerialized()) {
+          return false;
+        }
+        ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading();
+      } else if (obj instanceof byte[]) {
+        ba2 = (byte[]) obj;
+      } else {
+        return false;
+      }
+      byte[] ba1 = (byte[]) cd.getDeserializedForReading();
+      return Arrays.equals(ba1, ba2);
+    }
     Object cdVal = cd.getValue();
     if (cdVal instanceof byte[]) {
       byte[] cdValBytes = (byte[])cdVal;
@@ -806,14 +1056,21 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       if (pi != null) {
         return checkPdxEquals(pi, obj);
       }
-      //byte[] serializedObj;
-      /**
-       * To be more compatible with previous releases do not compare the serialized forms here.
-       * Instead deserialize and call the equals method.
-       */
+      if (isCompressedOffHeap) { // fix for bug 52248
+        byte[] serializedObj;
+        if (obj instanceof CachedDeserializable) {
+          serializedObj = ((CachedDeserializable) obj).getSerializedValue();
+        } else {
+          serializedObj = EntryEventImpl.serialize(obj); 
+        }
+        return Arrays.equals(cdValBytes, serializedObj); 
+      } else {
+        /**
+         * To be more compatible with previous releases do not compare the serialized forms here.
+         * Instead deserialize and call the equals method.
+         */
       Object deserializedObj;
       if (obj instanceof CachedDeserializable) {
-        //serializedObj = ((CachedDeserializable) obj).getSerializedValue();
         deserializedObj =((CachedDeserializable) obj).getDeserializedForReading();
       } else {
         if (obj == null || obj == Token.NOT_AVAILABLE
@@ -822,15 +1079,19 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         }
         // TODO OPTIMIZE: Before serializing all of obj we could get the top
         // level class name of cdVal and compare it to the top level class name of obj.
-        //serializedObj = EntryEventImpl.serialize(obj);
         deserializedObj = obj;
       }
-      return cd.getDeserializedForReading().equals(deserializedObj);
+      return basicEquals(deserializedObj, cd.getDeserializedForReading());
+      }
 //      boolean result = Arrays.equals((byte[])cdVal, serializedObj);
 //      if (!result) {
 //        try {
 //          Object o1 = BlobHelper.deserializeBlob((byte[])cdVal);
 //          Object o2 = BlobHelper.deserializeBlob(serializedObj);
+//          SimpleMemoryAllocatorImpl.debugLog("checkCDEquals o1=<" + o1 + "> o2=<" + o2 + ">", false);
+//          if (o1.equals(o2)) {
+//            SimpleMemoryAllocatorImpl.debugLog("they are equal! a1=<" + Arrays.toString((byte[])cdVal) + "> a2=<" + Arrays.toString(serializedObj) + ">", false);
+//          }
 //        } catch (IOException e) {
 //          // TODO Auto-generated catch block
 //          e.printStackTrace();
@@ -847,7 +1108,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         // class name of cdVal and the top level class name of obj and compare.
         obj = ((CachedDeserializable) obj).getDeserializedForReading();
       }
-      return cdVal.equals(obj);
+      return basicEquals(cdVal, obj);
     }
   }
   /**
@@ -857,6 +1118,10 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     if (!(obj instanceof PdxInstance)) {
       // obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized.
       if (obj instanceof CachedDeserializable) {
+        if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
+          // obj is actually a byte[] which will never be equal to a PdxInstance
+          return false;
+        }
         Object cdVal = ((CachedDeserializable) obj).getValue();
         if (cdVal instanceof byte[]) {
           byte[] cdValBytes = (byte[]) cdVal;
@@ -901,7 +1166,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         }
       }
     }
-    return obj.equals(pdx);
+    return basicEquals(obj, pdx);
   }
 
   
@@ -937,6 +1202,16 @@ public abstract class AbstractRegionEntry implements RegionEntry,
 
   public abstract Object getKey();
   
+  protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) {
+    if (v == null) return false;
+    if (Token.isInvalidOrRemoved(v)) return false;
+    if (v == Token.NOT_AVAILABLE) return false;
+    if (v instanceof DiskEntry.RecoveredEntry) return false; // The disk layer has special logic that ends up storing the nested value in the RecoveredEntry off heap
+    if (!(e instanceof OffHeapRegionEntry)) return false;
+    // TODO should we check for deltas here or is that a user error?
+    return true;
+  }
+
   /**
    * Default implementation. Override in subclasses with primitive keys
    * to prevent creating an Object form of the key for each equality check.
@@ -993,6 +1268,120 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     } while(!done);
   }
 
+  @Override
+  @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+  public  Object prepareValueForCache(RegionEntryContext r,
+      @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
+      boolean isEntryUpdate) {
+    return prepareValueForCache(r, val, null, isEntryUpdate);
+  }
+
+  @Override
+  @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
+  public  Object prepareValueForCache(RegionEntryContext r,
+      @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
+      EntryEventImpl event, boolean isEntryUpdate) {
+    if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) {
+      if (val instanceof StoredObject) {
+        // Check to see if val has the same compression settings as this region.
+        // The recursive calls in this section are safe because
+        // we only do it after copy the off-heap value to the heap.
+        // This is needed to fix bug 52057.
+        StoredObject soVal = (StoredObject) val;
+        assert !soVal.isCompressed();
+        if (r.getCompressor() != null) {
+          // val is uncompressed and we need a compressed value.
+          // So copy the off-heap value to the heap in a form that can be compressed.
+          byte[] valAsBytes = soVal.getValueAsHeapByteArray();
+          Object heapValue;
+          if (soVal.isSerialized()) {
+            heapValue = CachedDeserializableFactory.create(valAsBytes);
+          } else {
+            heapValue = valAsBytes;
+          }
+          return prepareValueForCache(r, heapValue, event, isEntryUpdate);
+        }
+        if (val instanceof Chunk) {
+          // if the reused guy has a refcount then need to inc it
+          if (!((Chunk)val).retain()) {
+            throw new IllegalStateException("Could not use an off heap value because it was freed");
+          }
+        }
+        // else it is DataAsAddress. This code just returns it as prepared.
+        // TODO OFFHEAP: Review the callers to see if they will handle DataAsAddress correctly.
+      } else {
+        byte[] data;
+        boolean isSerialized = !(val instanceof byte[]);
+        if (isSerialized) {
+          if (event != null && event.getCachedSerializedNewValue() != null) {
+            data = event.getCachedSerializedNewValue();
+          } else if (val instanceof CachedDeserializable) {
+            data = ((CachedDeserializable)val).getSerializedValue();
+            // TODO OFFHEAP: cache data in event?
+          } else if (val instanceof PdxInstance) {
+            try {
+              data = ((ConvertableToBytes)val).toBytes();
+              // TODO OFFHEAP: cache data in event?
+            } catch (IOException e) {
+              throw new PdxSerializationException("Could not convert " + val + " to bytes", e);
+            }
+          } else {
+            data = EntryEventImpl.serialize(val);
+            // TODO OFFHEAP: cache data in event?
+          }
+        } else {
+          data = (byte[]) val;
+        }
+        byte[] compressedData = compressBytes(r, data);
+        boolean isCompressed = compressedData != data;
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(this);
+        MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875
+        val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, GemFireChunk.TYPE); // TODO:KIRK:48068 race happens right after this line
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+        if (val instanceof GemFireChunk) {
+          val = new com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkWithHeapForm((GemFireChunk)val, data);
+        }
+//        if (val instanceof Chunk && r instanceof LocalRegion) {
+//          Chunk c = (Chunk) val;
+//          LocalRegion lr = (LocalRegion) r;
+//          SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false);
+//        }
+      }
+      return val;
+    }
+    @Unretained Object nv = val;
+    if (nv instanceof StoredObject) {
+      // This off heap value is being put into a on heap region.
+      byte[] data = ((StoredObject) nv).getSerializedValue();
+      nv = CachedDeserializableFactory.create(data);
+    }
+    // don't bother checking for SQLFire
+    if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) {
+      // We do not want to put PDXs in the cache as values.
+      // So get the serialized bytes and use a CachedDeserializable.
+      try {
+        byte[] data = ((ConvertableToBytes)nv).toBytes();
+        byte[] compressedData = compressBytes(r, data);
+        if (data == compressedData) {
+          nv = CachedDeserializableFactory.create(data);
+        } else {
+          nv = compressedData;
+        }
+      } catch (IOException e) {
+        throw new PdxSerializationException("Could not convert " + nv + " to bytes", e);
+      }
+    } else {
+      nv = compress(r, nv, event);
+    }
+    return nv;
+  }
+  
+  @Override
+  @Unretained
+  public final Object _getValue() {
+    return getValueField();
+  }
+
   public final boolean isUpdateInProgress() {
     return areAnyBitsSet(UPDATE_IN_PROGRESS);
   }
@@ -1037,6 +1426,29 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     TXManagerImpl.incRefCount(this);
     setInUseByTransaction(true);
   }
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final boolean isMarkedForEviction() {
+    return areAnyBitsSet(MARKED_FOR_EVICTION);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void setMarkedForEviction() {
+    setBits(MARKED_FOR_EVICTION);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void clearMarkedForEviction() {
+    clearBits(~MARKED_FOR_EVICTION);
+  }
   
   @Override
   public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
@@ -1068,16 +1480,12 @@ public abstract class AbstractRegionEntry implements RegionEntry,
    * Instead of overriding this method; override areSetValue.
    */
   protected final void _setValue(Object val) {
-    areSetValue(val);
+    setValueField(val);
   }
   
   @Override
-  public final Object _getValue() {
-    return areGetValue();
-  }
-  @Override
   public Token getValueAsToken() {
-    Object v = areGetValue();
+    Object v = getValueField();
     if (v == null || v instanceof Token) {
       return (Token)v;
     } else {
@@ -1085,11 +1493,23 @@ public abstract class AbstractRegionEntry implements RegionEntry,
     }
   }
   
-  protected abstract Object areGetValue();
-  protected abstract void areSetValue(Object v);
+  /**
+   * Reads the value of this region entry.
+   * Provides low level access to the value field.
+   * @return possible OFF_HEAP_OBJECT (caller uses region entry reference)
+   */
+  @Unretained
+  protected abstract Object getValueField();
+  /**
+   * Set the value of this region entry.
+   * Provides low level access to the value field.
+   * @param v the new value to set
+   */
+  protected abstract void setValueField(@Unretained Object v);
 
+  @Retained
   public Object getTransformedValue() {
-    return _getValueUse(null, false);
+    return _getValueRetain(null, false);
   }
   
   public final boolean getValueWasResultOfSearch() {
@@ -1187,7 +1607,19 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       VersionTag tag = VersionTag.create(mbr);
       tag.setEntryVersion(v);
       if (region.getVersionVector() != null) {
-        tag.setRegionVersion(region.getVersionVector().getNextVersion());
+        // Use region version if already provided, else generate
+        long nextRegionVersion = event.getNextRegionVersion();
+        if (nextRegionVersion != -1) {
+          // Set on the tag and record it locally
+          tag.setRegionVersion(nextRegionVersion);
+          RegionVersionVector rvv = region.getVersionVector();
+          rvv.recordVersion(rvv.getOwnerId(),nextRegionVersion);
+          if (logger.isDebugEnabled()) {
+            logger.debug("recorded region version {}; region={}", nextRegionVersion, region.getFullPath());
+          }
+        } else {
+          tag.setRegionVersion(region.getVersionVector().getNextVersion());  
+        }
       }
       if (withDelta) {
         tag.setPreviousMemberID(previous);
@@ -1214,7 +1646,10 @@ public abstract class AbstractRegionEntry implements RegionEntry,
       stamp.setMemberID(mbr);
       event.setVersionTag(tag);
       if (logger.isDebugEnabled()) {
-        logger.debug("generated tag {}; region={}; rvv={}", tag, region.getFullPath(), region.getVersionVector());
+        logger.debug("generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag,
+            event.getKey(), event.getOldValue(), event.getNewValue(),
+            (event.getContext() == null? "none" : event.getContext().getDistributedMember().getName()),
+            region.getFullPath(), region.getVersionVector());
       }
       return tag;
     }
@@ -1692,6 +2127,8 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         SystemFailure.checkFailure();
         logger.error(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_OCCURRED_IN_CONFLICTRESOLVER), t);
         thr = t;
+      } finally {
+        timestampedEvent.release();
       }
 
       if (isDebugEnabled) {
@@ -1775,8 +2212,13 @@ public abstract class AbstractRegionEntry implements RegionEntry,
    */
   public static final int MAX_INLINE_STRING_KEY_BYTE_ENCODING = 15;
   
+  /**
+   * This is only retained in off-heap subclasses.  However, it's marked as
+   * Retained here so that callers are aware that the value may be retained.
+   */
   @Override
-  public Object _getValueUse(RegionEntryContext context, boolean decompress) {
+  @Retained 
+  public Object _getValueRetain(RegionEntryContext context, boolean decompress) {
     if (decompress) {
       return decompress(context, _getValue());
     } else {


Mime
View raw message