geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [25/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:02:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index b3cecf3..790dc4d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -8,6 +8,8 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -80,6 +82,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
 import com.gemstone.gemfire.internal.cache.RemoteFetchVersionMessage.FetchVersionResponse;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
 import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
 import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
 import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
@@ -101,12 +104,18 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE
 import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
 import com.gemstone.org.jgroups.util.StringId;
@@ -1243,11 +1252,13 @@ public class DistributedRegion extends LocalRegion implements
   private boolean giiMissingRequiredRoles = false;
 
   /**
-   * A reference counter to protected the heapThresholdReached boolean
+   * A reference counter to protected the memoryThresholdReached boolean
    */
-  private final Set<DistributedMember> heapThresholdReachedMembers =
+  private final Set<DistributedMember> memoryThresholdReachedMembers =
     new CopyOnWriteArraySet<DistributedMember>();
 
+  private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
   /** Sets and returns giiMissingRequiredRoles */
   private boolean checkInitialImageForReliability(
       InternalDistributedMember imageTarget,
@@ -1323,7 +1334,7 @@ public class DistributedRegion extends LocalRegion implements
     // remote members
     if (!isInternalRegion()) {
       if (!this.isDestroyed) {
-        cache.getResourceManager().addResourceListener(this);
+        cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
       }
     }
     
@@ -1530,8 +1541,8 @@ public class DistributedRegion extends LocalRegion implements
       return;
     }
 
-    if (this.entries.size() > 0) {
-      this.entries.clear(null);
+    if (!this.entries.isEmpty()) {
+      closeEntries();
       if (getDiskRegion() != null) {
         getDiskRegion().clear(this, null);
       }
@@ -1709,7 +1720,7 @@ public class DistributedRegion extends LocalRegion implements
         // clear any entries received in the GII that are older than the RVV versions.
         // this can happen if entry chunks were received prior to the clear() being
         // processed
-        this.entries.clear(rvv);
+        clearEntries(rvv);
       }
       //need to do this before we release the afterGetInitialImageLatch
       if(persistenceAdvisor != null) {
@@ -2316,6 +2327,7 @@ public class DistributedRegion extends LocalRegion implements
        this.filterProfile != null && this.filterProfile.hasCQs();
     profile.gatewaySenderIds = getGatewaySenderIds();
     profile.asyncEventQueueIds = getAsyncEventQueueIds();
+    profile.isOffHeap = getOffHeap();
   }
 
   /**
@@ -2398,9 +2410,10 @@ public class DistributedRegion extends LocalRegion implements
 
   /** @return the deserialized value */
   @Override
+  @Retained
   protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
       TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
-        boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones)
+        boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     checkForLimitedOrNoAccess();
@@ -2418,13 +2431,17 @@ public class DistributedRegion extends LocalRegion implements
     long lastModified = 0L;
     boolean fromServer = false;
     EntryEventImpl event = null;
+    @Retained Object result = null;
+    boolean incrementUseCountForSqlf = false;
+    try {
     {
       if (this.srp != null) {
         EntryEventImpl holder = EntryEventImpl.createVersionTagHolder();
+        try {
         Object value = this.srp.get(key, aCallbackArgument, holder);
         fromServer = value != null;
         if (fromServer) {
-          event = new EntryEventImpl(this, op, key, value,
+          event = EntryEventImpl.create(this, op, key, value,
                                      aCallbackArgument, false,
                                      getMyId(), generateCallbacks);
           event.setVersionTag(holder.getVersionTag());
@@ -2433,12 +2450,15 @@ public class DistributedRegion extends LocalRegion implements
             clientEvent.setVersionTag(holder.getVersionTag());
           }
         }
+        } finally {
+          holder.release();
+        }
       }
     }
     
     if (!fromServer) {
       //Do not generate Event ID
-      event = new EntryEventImpl(this, op, key, null /*newValue*/,
+      event = EntryEventImpl.create(this, op, key, null /*newValue*/,
                                  aCallbackArgument, false,
                                  getMyId(), generateCallbacks);
       if (requestingClient != null) {
@@ -2459,7 +2479,7 @@ public class DistributedRegion extends LocalRegion implements
         processor.release();
       }
     }
-    if (event.hasNewValue() && !isHeapThresholdReachedForLoad()) {
+    if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
       try {
         // Set eventId. Required for interested clients.
         event.setNewEventId(cache.getDistributedSystem());
@@ -2478,7 +2498,7 @@ public class DistributedRegion extends LocalRegion implements
           	((BucketRegion)this).handleWANEvent(event);
           }
           re = basicPutEntry(event, lastModified);
-          
+          incrementUseCountForSqlf = GemFireCacheImpl.sqlfSystem() ;
         } catch (ConcurrentCacheModificationException e) {
           // the cache was modified while we were searching for this entry and
           // the netsearch result was elided.  Return the current value from the cache
@@ -2500,17 +2520,39 @@ public class DistributedRegion extends LocalRegion implements
     if (isCreate) {
       recordMiss(re, key);
     }
-    Object result;
+    
     if (preferCD) {
-      result = event.getRawNewValue();
-      // fix for bug 42895
-      if (!(result instanceof CachedDeserializable)) {
+      if (event.hasDelta()) {
         result = event.getNewValue();
-      }
+      } else {
+        result = event.getRawNewValueAsHeapObject();
+      }    
     } else {
-      result = event.getNewValue();
+      result = event.getNewValue();     
+    }
+    //For SQLFire , we need to increment the use count so that returned
+    //object has use count 2
+    if( incrementUseCountForSqlf && result instanceof Chunk) {
+      ((Chunk)result).retain();
     }
     return result;
+    } finally {
+      if (event != null) {
+        event.release();        
+      }
+    }
+  }
+  
+  protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() {
+    if (this.hdfsQueue == null) {
+      String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName();
+      final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+      final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+      AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+      if (ep == null) return null;
+      hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+    }
+    return hdfsQueue;
   }
 
   /** hook for subclasses to note that a cache load was performed
@@ -4151,13 +4193,17 @@ public class DistributedRegion extends LocalRegion implements
   }
 
   @Override
-  protected void setHeapThresholdFlag(MemoryEvent event) {
+  protected void setMemoryThresholdFlag(MemoryEvent event) {
     Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
 
     if (event.isLocal() || others.contains(event.getMember())) {
-      if (event.getType().isCriticalUp()) {
-        setHeapThresholdReachedCounterTrue(event.getMember());
-      } else if (event.getType().isCriticalDown() || event.getType().isCriticalDisabled()) {
+      if (event.getState().isCritical()
+          && !event.getPreviousState().isCritical()
+          && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
+        setMemoryThresholdReachedCounterTrue(event.getMember());
+      } else if (!event.getState().isCritical()
+          && event.getPreviousState().isCritical()
+          && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
         removeMemberFromCriticalList(event.getMember());
       }
     }
@@ -4168,28 +4214,28 @@ public class DistributedRegion extends LocalRegion implements
     if (logger.isDebugEnabled()) {
       logger.debug("DR: removing member {} from critical member list", member);
     }
-    synchronized(this.heapThresholdReachedMembers) {
-      this.heapThresholdReachedMembers.remove(member);
-      if (this.heapThresholdReachedMembers.size() == 0) {
-        heapThresholdReached.set(false);
+    synchronized(this.memoryThresholdReachedMembers) {
+      this.memoryThresholdReachedMembers.remove(member);
+      if (this.memoryThresholdReachedMembers.size() == 0) {
+        memoryThresholdReached.set(false);
       }
     }
   }
   
   @Override
-  public Set<DistributedMember> getHeapThresholdReachedMembers() {
-    synchronized (this.heapThresholdReachedMembers) {
-      return Collections.unmodifiableSet(this.heapThresholdReachedMembers);
+  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
+    synchronized (this.memoryThresholdReachedMembers) {
+      return Collections.unmodifiableSet(this.memoryThresholdReachedMembers);
     }
   }
 
   @Override
-  public void initialCriticalMembers(boolean localHeapIsCritical,
+  public void initialCriticalMembers(boolean localMemoryIsCritical,
       Set<InternalDistributedMember> critialMembers) {
     Set<InternalDistributedMember> others = getCacheDistributionAdvisor().adviseGeneric();
     for (InternalDistributedMember idm: critialMembers) {
       if (others.contains(idm)) {
-        setHeapThresholdReachedCounterTrue(idm);
+        setMemoryThresholdReachedCounterTrue(idm);
       }
     }
   }
@@ -4197,11 +4243,11 @@ public class DistributedRegion extends LocalRegion implements
   /**
    * @param idm member whose threshold has been exceeded
    */
-  private void setHeapThresholdReachedCounterTrue(final DistributedMember idm) {
-    synchronized(this.heapThresholdReachedMembers) {
-      this.heapThresholdReachedMembers.add(idm);
-      if (this.heapThresholdReachedMembers.size() > 0) {
-        heapThresholdReached.set(true);
+  private void setMemoryThresholdReachedCounterTrue(final DistributedMember idm) {
+    synchronized(this.memoryThresholdReachedMembers) {
+      this.memoryThresholdReachedMembers.add(idm);
+      if (this.memoryThresholdReachedMembers.size() > 0) {
+        memoryThresholdReached.set(true);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegionFunctionStreamingMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegionFunctionStreamingMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegionFunctionStreamingMessage.java
index e446887..74edb38 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegionFunctionStreamingMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegionFunctionStreamingMessage.java
@@ -443,4 +443,9 @@ public class DistributedRegionFunctionStreamingMessage extends DistributionMessa
   public boolean canParticipateInTransaction() {
     return true;
   }
+  
+  @Override
+  public boolean isTransactionDistributed() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
index c7a993b..b62d430 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRemoveAllOperation.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.EntryVersionsList;
+import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
 import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
 import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
@@ -90,6 +91,22 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
           return removeAllData;
   }
   
+  public void setRemoveAllEntryData(RemoveAllEntryData[] removeAllEntryData) {
+    for (int i = 0; i < removeAllEntryData.length; i++) {
+      removeAllData[i] = removeAllEntryData[i];
+    }
+    this.removeAllDataSize = removeAllEntryData.length;
+  }
+  
+  /**
+   * Add an entry that this removeAll operation should distribute.
+   */
+  public void addEntry(RemoveAllEntryData removeAllEntry)
+  {
+    this.removeAllData[this.removeAllDataSize] = removeAllEntry;
+    this.removeAllDataSize += 1;
+  }
+  
   /**
    * Add an entry that this removeAll operation should distribute.
    */
@@ -161,6 +178,16 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
     };
   }
   
+  public void freeOffHeapResources() {
+    // I do not use eventIterator here because it forces the lazy creation of EntryEventImpl by calling getEventForPosition.
+    for (int i=0; i < this.removeAllDataSize; i++) {
+      RemoveAllEntryData entry = this.removeAllData[i];
+      if (entry != null && entry.event != null) {
+        entry.event.release();
+      }
+    }
+  }
+
   public EntryEventImpl getEventForPosition(int position) {
     RemoveAllEntryData entry = this.removeAllData[position];
     if (entry == null) {
@@ -170,7 +197,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
       return entry.event;
     }
     LocalRegion region = (LocalRegion)this.event.getRegion();
-    EntryEventImpl ev = new EntryEventImpl(
+    EntryEventImpl ev = EntryEventImpl.create(
         region,
         entry.getOp(),
         entry.getKey(), null/* value */, this.event.getCallbackArgument(),
@@ -178,6 +205,8 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
         this.event.getDistributedMember(),
         this.event.isGenerateCallbacks(),
         entry.getEventID());
+    boolean returnedEv = false;
+    try {
     ev.setPossibleDuplicate(entry.isPossibleDuplicate());
     if (entry.versionTag != null && region.concurrencyChecksEnabled) {
       VersionSource id = entry.versionTag.getMemberID();
@@ -189,6 +218,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
     }
       
     entry.event = ev;
+    returnedEv = true;
     ev.setOldValue(entry.getOldValue());
     CqService cqService = region.getCache().getCqService();
     if (cqService.isRunning() && !entry.getOp().isCreate() && !ev.hasOldValue()) {
@@ -201,6 +231,11 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
     ev.callbacksInvoked(entry.isCallbacksInvoked());
     ev.setTailKey(entry.getTailKey());
     return ev;
+    } finally {
+      if (!returnedEv) {
+        ev.release();
+      }
+    }
   }
 
   public final EntryEventImpl getBaseEvent() {
@@ -604,6 +639,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
       if (prMsg == null) {
         prMsg = new RemoveAllPRMessage(bucketId.intValue(), removeAllDataSize, false,
             event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
+        prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
 
         // set dpao's context(original sender) into each PutAllMsg
         // dpao's event's context could be null if it's P2P putAll in PR
@@ -798,7 +834,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
     throws EntryNotFoundException
     {
       // Gester: We have to specify eventId for the message of MAP
-      EntryEventImpl event = new EntryEventImpl(
+      EntryEventImpl event = EntryEventImpl.create(
           rgn,
           Operation.REMOVEALL_DESTROY, null /* key */, null/* value */,
           this.callbackArg, true /* originRemote */, getSender());
@@ -858,6 +894,7 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
             rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
           }
         }
+        ev.release();
       }
     }
     
@@ -883,10 +920,12 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
         ((KeyWithRegionContext)key).setRegionContext(rgn);
       }
       EventID evId = entry.getEventID();
-      EntryEventImpl ev = new EntryEventImpl(rgn, entry.getOp(),
+      EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
           key, null/* value */, callbackArg,
           originRemote, sender, !skipCallbacks,
           evId);
+      boolean returnedEv = false;
+      try {
       if (context != null) {
         ev.context = context;
       }
@@ -907,7 +946,13 @@ public class DistributedRemoveAllOperation extends AbstractUpdateOperation // TO
        * Setting tailKey for the secondary bucket here. Tail key was update by the primary.
        */
       ev.setTailKey(entry.getTailKey());
+      returnedEv = true;
       return ev;
+      } finally {
+        if (!returnedEv) {
+          ev.release();
+        }
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index ab4887a..e6503f0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger;
 import com.gemstone.gemfire.CopyHelper;
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.DeltaSerializationException;
+import com.gemstone.gemfire.GemFireIOException;
 import com.gemstone.gemfire.InvalidDeltaException;
 import com.gemstone.gemfire.SerializationException;
 import com.gemstone.gemfire.SystemFailure;
@@ -46,14 +47,16 @@ import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.InternalDataSerializer.Sendable;
+import com.gemstone.gemfire.internal.Sendable;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
 import com.gemstone.gemfire.internal.cache.delta.Delta;
 import com.gemstone.gemfire.internal.cache.lru.Sizeable;
 import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tx.DistTxKeyInfo;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackArgument;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -61,6 +64,21 @@ import com.gemstone.gemfire.internal.lang.StringUtils;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.OffHeapReference;
+import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
+
+import com.gemstone.gemfire.internal.util.ArrayUtils;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
 
@@ -70,6 +88,7 @@ import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
 // must be public for DataSerializableFixedID
 public class EntryEventImpl
   implements EntryEvent, InternalCacheEvent, DataSerializableFixedID, EntryOperation
+             , Releasable
 {
   private static final Logger logger = LogService.getLogger();
   
@@ -92,7 +111,8 @@ public class EntryEventImpl
    * But it is only non-null if setSerializedNewValue was called.
    */
   private byte[] cachedSerializedNewValue = null;
-  protected Object oldValue = null;
+  @Retained(ENTRY_EVENT_OLD_VALUE)
+  private Object oldValue = null;
   protected Delta delta = null;
  
   protected short eventFlags = 0x0000;
@@ -109,14 +129,14 @@ public class EntryEventImpl
    *
    * @since 5.0
    */
-  private transient DistributedPutAllOperation putAllOp;
+  protected transient DistributedPutAllOperation putAllOp;
 
   /**
    * This field will be null unless this event is used for a removeAll operation.
    *
    * @since 8.1
    */
-  private transient DistributedRemoveAllOperation removeAllOp;
+  protected transient DistributedRemoveAllOperation removeAllOp;
 
   /**
    * The member that originated this event
@@ -166,6 +186,18 @@ public class EntryEventImpl
   /** version tag for concurrency checks */
   protected VersionTag versionTag;
 
+  /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/
+  private transient boolean fetchFromHDFS = true;
+  
+  private transient boolean isPutDML = false;
+
+  /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/
+  private transient boolean loadedFromHDFS= false;
+  
+  private transient boolean isCustomEviction = false;
+  
+  /** boolean to indicate that the RegionEntry for this event has been evicted*/
+  private transient boolean isEvicted = false;
   
   private transient boolean isPendingSecondaryExpireDestroy = false;
   
@@ -179,8 +211,9 @@ public class EntryEventImpl
    * and anything else of use while processing another event
    * @return the empty event object
    */
+  @Retained
   public static EntryEventImpl createVersionTagHolder() {
-    return new EntryEventImpl();
+    return createVersionTagHolder(null);
   }
   
   /**
@@ -188,9 +221,11 @@ public class EntryEventImpl
    * and anything else of use while processing another event
    * @return the empty event object
    */
+  @Retained
   public static EntryEventImpl createVersionTagHolder(VersionTag tag) {
     EntryEventImpl result = new EntryEventImpl();
     result.setVersionTag(tag);
+    result.disallowOffHeapValues();
     return result;
   }
 
@@ -235,38 +270,8 @@ public class EntryEventImpl
     this.tailKey = DataSerializer.readLong(in);
   }
 
-  public EntryEventImpl(LocalRegion region,
-      Operation op,
-      Object key, Object newValue, Object callbackArgument,
-      boolean originRemote, DistributedMember distributedMember) {
-    // Added initializeId=true as the default case. This doesn't
-    // mean that the id will be initialized for sure. It only
-    // means that it will not be set after the entry is instantiated.
-    // UpdateOperation and DestroyOperation both set the id after the
-    // entry is instantiated. Other callers do not.
-    this(region, op, key, newValue, callbackArgument,
-        originRemote, distributedMember, true/* generateCallbacks */,
-        true /*initializeId*/);
-  }
-
-  public EntryEventImpl(LocalRegion region,
-      Operation op,
-      Object key,
-      Object newValue,
-      Object callbackArgument,
-      boolean originRemote,
-      DistributedMember distributedMember,
-      boolean generateCallbacks) {
-    // Added initializeId=true as the default case. This doesn't
-    // mean that the id will be initialized for sure. It only
-    // means that it will not be set after the entry is instantiated.
-    // UpdateOperation and DestroyOperation both set the id after the
-    // entry is instantiated. Other callers do not.
-    this(region, op, key, newValue, callbackArgument, originRemote,
-        distributedMember, generateCallbacks, true/*initializeId*/);
-  }
-  
-  public EntryEventImpl(LocalRegion region, Operation op, Object key,
+  @Retained
+  protected EntryEventImpl(LocalRegion region, Operation op, Object key,
       boolean originRemote, DistributedMember distributedMember,
       boolean generateCallbacks, boolean fromRILocalDestroy) {
     this.region = region;
@@ -282,9 +287,10 @@ public class EntryEventImpl
    * Doesn't specify oldValue as this will be filled in later as part of an
    * operation on the region, or lets it default to null.
    */
-  public EntryEventImpl(
+  @Retained
+  protected EntryEventImpl(
       final LocalRegion region,
-      Operation op, Object key, Object newVal,
+      Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal,
       Object callbackArgument,
       boolean originRemote, DistributedMember distributedMember,
       boolean generateCallbacks, boolean initializeId) {
@@ -316,8 +322,9 @@ public class EntryEventImpl
   /**
    * Called by BridgeEntryEventImpl to use existing EventID
    */
-  public EntryEventImpl(LocalRegion region, Operation op, Object key,
-      Object newValue, Object callbackArgument, boolean originRemote,
+  @Retained
+  protected EntryEventImpl(LocalRegion region, Operation op, Object key,
+      @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, boolean originRemote,
       DistributedMember distributedMember, boolean generateCallbacks,
       EventID eventID) {
     this(region, op, key, newValue,
@@ -330,29 +337,38 @@ public class EntryEventImpl
   /**
    * create an entry event from another entry event
    */
-  public EntryEventImpl(EntryEventImpl other) {
+  @Retained
+  public EntryEventImpl(@Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other) {
+    this(other, true);
+  }
+  
+  @Retained
+  public EntryEventImpl(@Retained({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE}) EntryEventImpl other, boolean setOldValue) {
     region = other.region;
-    
+
     this.eventID = other.eventID;
     basicSetNewValue(other.basicGetNewValue());
     this.newValueBytes = other.newValueBytes;
     this.cachedSerializedNewValue = other.cachedSerializedNewValue;
     this.re = other.re;
     this.delta = other.delta;
-    basicSetOldValue(other.basicGetOldValue(), true);
-    this.oldValueBytes = other.oldValueBytes;
-    eventFlags = other.eventFlags;
+    if (setOldValue) {
+      retainAndSetOldValue(other.basicGetOldValue());
+      this.oldValueBytes = other.oldValueBytes;
+    }
+    this.eventFlags = other.eventFlags;
     setEventFlag(EventFlags.FLAG_CALLBACKS_INVOKED, false);
     txId = other.txId;
     op = other.op;
     distributedMember = other.distributedMember;
     this.filterInfo = other.filterInfo;
+    this.keyInfo = other.keyInfo.isDistKeyInfo() ? new DistTxKeyInfo(
+        (DistTxKeyInfo) other.keyInfo) : new KeyInfo(other.keyInfo);
     if (other.getRawCallbackArgument() instanceof GatewaySenderEventCallbackArgument) {
-      this.keyInfo = new KeyInfo(other.keyInfo);
-      this.keyInfo.setCallbackArg((new GatewaySenderEventCallbackArgument(
-          (GatewaySenderEventCallbackArgument)other.getRawCallbackArgument())));
-    } else {
-      this.keyInfo = new KeyInfo(other.keyInfo);
+      this.keyInfo
+          .setCallbackArg((new GatewaySenderEventCallbackArgument(
+              (GatewaySenderEventCallbackArgument) other
+                  .getRawCallbackArgument())));
     }
     this.context = other.context;
     this.deltaBytes = other.deltaBytes;
@@ -362,6 +378,7 @@ public class EntryEventImpl
     this.setPossibleDuplicate(other.isPossibleDuplicate()); 
   }
 
+  @Retained
   public EntryEventImpl(Object key2) {
     this.keyInfo = new KeyInfo(key2, null, null);
   }
@@ -372,35 +389,118 @@ public class EntryEventImpl
    * used in cache operations.
    * @param id the identity of the client's event
    */
+  @Retained
   public EntryEventImpl(EventID id) {
     this.eventID = id;
+    this.offHeapOk = false;
   }
 
   /**
+   * Creates and returns an EntryEventImpl.  Generates and assigns a bucket id to the
+   * EntryEventImpl if the region parameter is a PartitionedRegion.
+   */  
+  @Retained
+  public static EntryEventImpl create(LocalRegion region,
+      Operation op,
+      Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument,
+      boolean originRemote, DistributedMember distributedMember) {
+    return create(region,op,key,newValue,callbackArgument,originRemote,distributedMember,true,true);
+  }
+  
+  /**
+   * Creates and returns an EntryEventImpl.  Generates and assigns a bucket id to the
+   * EntryEventImpl if the region parameter is a PartitionedRegion.
+   */
+  @Retained
+  public static EntryEventImpl create(LocalRegion region,
+      Operation op,
+      Object key,
+      @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue,
+      Object callbackArgument,
+      boolean originRemote,
+      DistributedMember distributedMember,
+      boolean generateCallbacks) {
+    return create(region, op, key, newValue, callbackArgument, originRemote,
+        distributedMember, generateCallbacks,true);
+  }
+  
+  /**
+   * Creates and returns an EntryEventImpl.  Generates and assigns a bucket id to the
+   * EntryEventImpl if the region parameter is a PartitionedRegion.
+   *  
+   * Called by BridgeEntryEventImpl to use existing EventID
+   * 
+   * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, Object, Object, boolean, DistributedMember, boolean, EventID)}
+   */ 
+  @Retained
+  public static EntryEventImpl create(LocalRegion region, Operation op, Object key,
+      @Retained(ENTRY_EVENT_NEW_VALUE) Object newValue, Object callbackArgument, boolean originRemote,
+      DistributedMember distributedMember, boolean generateCallbacks,
+      EventID eventID) {
+    EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,newValue,callbackArgument,originRemote,distributedMember,generateCallbacks,eventID);
+    return entryEvent;
+  }
+  
+  /**
+   * Creates and returns an EntryEventImpl.  Generates and assigns a bucket id to the
+   * EntryEventImpl if the region parameter is a PartitionedRegion.
+   * 
+   * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, boolean, DistributedMember, boolean, boolean)}
+   */
+  @Retained
+  public static EntryEventImpl create(LocalRegion region, Operation op, Object key,
+      boolean originRemote, DistributedMember distributedMember,
+      boolean generateCallbacks, boolean fromRILocalDestroy) {
+    EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,originRemote,distributedMember,generateCallbacks,fromRILocalDestroy);
+    return entryEvent;
+  }  
+  
+  /**
+   * Creates and returns an EntryEventImpl.  Generates and assigns a bucket id to the
+   * EntryEventImpl if the region parameter is a PartitionedRegion.
+   * 
+   * This creator does not specify the oldValue as this will be filled in later as part of an
+   * operation on the region, or lets it default to null.
+   * 
+   * {@link EntryEventImpl#EntryEventImpl(LocalRegion, Operation, Object, Object, Object, boolean, DistributedMember, boolean, boolean)}
+   */
+  @Retained
+  public static EntryEventImpl create(final LocalRegion region,
+      Operation op, Object key, @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal,
+      Object callbackArgument,
+      boolean originRemote, DistributedMember distributedMember,
+      boolean generateCallbacks, boolean initializeId)  {
+    EntryEventImpl entryEvent = new EntryEventImpl(region,op,key,newVal,callbackArgument,
+        originRemote,distributedMember,generateCallbacks,initializeId);
+    return entryEvent;
+  }
+  
+  /**
    * Creates a PutAllEvent given the distributed operation, the region, and the
    * entry data.
    *
    * @since 5.0
    */
+  @Retained
   static EntryEventImpl createPutAllEvent(
       DistributedPutAllOperation putAllOp, LocalRegion region,
-      Operation entryOp, Object entryKey, Object entryNewValue)
+      Operation entryOp, Object entryKey, @Retained(ENTRY_EVENT_NEW_VALUE) Object entryNewValue)
   {
     EntryEventImpl e;
     if (putAllOp != null) {
       EntryEventImpl event = putAllOp.getBaseEvent();
       if (event.isBridgeEvent()) {
-        e = new EntryEventImpl(region, entryOp, entryKey, entryNewValue,
+        e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue,
             event.getRawCallbackArgument(), false, event.distributedMember,
             event.isGenerateCallbacks());
         e.setContext(event.getContext());
       } else {
-        e = new EntryEventImpl(region, entryOp, entryKey, entryNewValue, event.getCallbackArgument(),
+        e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, event.getCallbackArgument(),
             false, region.getMyId(), event.isGenerateCallbacks());
       }
       
     } else {
-      e = new EntryEventImpl(region, entryOp, entryKey, entryNewValue, null,
+      e = EntryEventImpl.create(region, entryOp, entryKey, entryNewValue, null,
           false, region.getMyId(), true);
     }
     
@@ -408,7 +508,7 @@ public class EntryEventImpl
     return e;
   }
   
-  static EntryEventImpl createRemoveAllEvent(
+  protected static EntryEventImpl createRemoveAllEvent(
       DistributedRemoveAllOperation op, 
       LocalRegion region,
       Object entryKey) {
@@ -417,17 +517,17 @@ public class EntryEventImpl
     if (op != null) {
       EntryEventImpl event = op.getBaseEvent();
       if (event.isBridgeEvent()) {
-        e = new EntryEventImpl(region, entryOp, entryKey, null,
+        e = EntryEventImpl.create(region, entryOp, entryKey, null,
             event.getRawCallbackArgument(), false, event.distributedMember,
             event.isGenerateCallbacks());
         e.setContext(event.getContext());
       } else {
-        e = new EntryEventImpl(region, entryOp, entryKey, null, event.getCallbackArgument(),
+        e = EntryEventImpl.create(region, entryOp, entryKey, null, event.getCallbackArgument(),
             false, region.getMyId(), event.isGenerateCallbacks());
       }
       
     } else {
-      e = new EntryEventImpl(region, entryOp, entryKey, null, null,
+      e = EntryEventImpl.create(region, entryOp, entryKey, null, null,
           false, region.getMyId(), true);
     }
     
@@ -585,6 +685,22 @@ public class EntryEventImpl
     return this.op.isEviction();
   }
 
+  public final boolean isCustomEviction() {
+    return this.isCustomEviction;
+  }
+  
+  public final void setCustomEviction(boolean customEvict) {
+    this.isCustomEviction = customEvict;
+  }
+  
+  public final void setEvicted() {
+    this.isEvicted = true;
+  }
+  
+  public final boolean isEvicted() {
+    return this.isEvicted;
+  }
+  
   public final boolean isPendingSecondaryExpireDestroy() {
     return this.isPendingSecondaryExpireDestroy;
   }
@@ -697,13 +813,12 @@ public class EntryEventImpl
    *
    * @return the value in the cache prior to this event.
    */
-  public Object getOldValue()
-  {
+  public final Object getOldValue() {
     try {
       if (isOriginRemote() && this.region.isProxy()) {
         return null;
       }
-      Object ov = basicGetOldValue();
+      @Unretained Object ov = basicGetOldValue();
       if (ov == null) {
         return null;
       } else if (ov == Token.NOT_AVAILABLE) {
@@ -711,6 +826,10 @@ public class EntryEventImpl
       }
       boolean doCopyOnRead = getRegion().isCopyOnRead();
       if (ov != null) {
+        if (ov instanceof StoredObject) {
+          // TODO OFFHEAP: returns off-heap PdxInstance
+          return ((StoredObject) ov).getValueAsDeserializedHeapObject();
+        } else
         if (ov instanceof CachedDeserializable) {
           CachedDeserializable cd = (CachedDeserializable)ov;
           if (doCopyOnRead) {
@@ -736,48 +855,210 @@ public class EntryEventImpl
   }
 
   /**
-   * Returns the new value as is; no deserialization or copying.
+   * Like getRawNewValue except that if the result is an off-heap reference then copy it to the heap.
    * ALERT: If there is a Delta, returns that, not the (applied) new value.
+   * TODO OFFHEAP: to prevent the heap copy use getRawNewValue instead
    */
-  public Object getRawNewValue() {
+  public final Object getRawNewValueAsHeapObject() {
     if (this.delta != null) {
       return this.delta;
     }
+    return OffHeapHelper.getHeapForm(OffHeapHelper.copyIfNeeded(basicGetNewValue()));
+  }
+  
+  /**
+   * If new value is a Delta return it.
+   * Else if new value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE). 
+   * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+   * Else return the raw form.
+   */
+  @Unretained(ENTRY_EVENT_NEW_VALUE)
+  public final Object getRawNewValue() {
+    if (this.delta != null) return this.delta;
+    return basicGetNewValue();
+  }
+
+  @Unretained(ENTRY_EVENT_NEW_VALUE)
+  public Object getValue() {
     return basicGetNewValue();
   }
   
-  private void basicSetNewValue(Object v) {
+  /**
+   * Returns the delta that represents the new value; null if no delta.
+   * @return the delta that represents the new value; null if no delta.
+   */
+  public final Delta getDeltaNewValue() {
+    return this.delta;
+  }
+
+  /**
+   *  Applies the delta 
+   */
+  private Object applyDeltaWithCopyOnRead(boolean doCopyOnRead) {
+    //try {
+      if (applyDelta(true)) {
+        Object applied = basicGetNewValue();
+        // if applyDelta returns true then newValue should not be off-heap
+        assert !(applied instanceof StoredObject);
+        if (applied == this.oldValue && doCopyOnRead) {
+          applied = CopyHelper.copy(applied);
+        }
+        return applied;
+      }
+    //} catch (EntryNotFoundException ex) {
+      // only (broken) product code has the opportunity to call this before
+      // this.oldValue is set. If oldValue is not set yet, then
+      // we most likely haven't synchronized on the region entry yet.
+      // (If we have, then make sure oldValue is set before
+      // calling this method).
+      //throw new AssertionError("too early to call getNewValue");
+    //}
+    return null;
+  }
+
+  @Released(ENTRY_EVENT_NEW_VALUE)
+  protected void basicSetNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object v) {
     if (v == this.newValue) return;
+    if (this.offHeapOk) {
+      OffHeapHelper.releaseAndTrackOwner(this.newValue, this);
+    }
+    if (v instanceof Chunk) {
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(this);
+      if (!((Chunk) v).retain()) {
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+        this.newValue = null;
+        return;
+      }
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+    }
     this.newValue = v;
     this.cachedSerializedNewValue = null;
   }
-  protected Object basicGetNewValue() {
+  /**
+   * Returns true if this event has a reference to an off-heap new or old value.
+   */
+  public boolean hasOffHeapValue() {
+    return (this.newValue instanceof Chunk) || (this.oldValue instanceof Chunk);
+  }
+  
+  @Unretained
+  protected final Object basicGetNewValue() {
     Object result = this.newValue;
+    if (!this.offHeapOk && result instanceof Chunk) {
+      //this.region.getCache().getLogger().info("DEBUG new value already freed " + System.identityHashCode(result));
+      throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
+    }
     return result;
   }
-  private void basicSetOldValue(Object v, boolean incRefCount) {
-    if (v == this.oldValue) return;
+  
+  private class OldValueOwner {
+    private EntryEventImpl getEvent() {
+      return EntryEventImpl.this;
+    }
+    @Override
+    public int hashCode() {
+      return getEvent().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof OldValueOwner) {
+        return getEvent().equals(((OldValueOwner) obj).getEvent());
+      } else {
+        return false;
+      }
+    }
+    @Override
+    public String toString() {
+      return "OldValueOwner " + getEvent().toString();
+    }
+  }
+
+  /**
+   * Note if v might be an off-heap reference that you did not retain for this EntryEventImpl
+   * then call retainsAndSetOldValue instead of this method.
+   * @param v the caller should have already retained this off-heap reference.
+   */
+  @Released(ENTRY_EVENT_OLD_VALUE)
+  private void basicSetOldValue(@Unretained(ENTRY_EVENT_OLD_VALUE) Object v) {
+    @Released final Object curOldValue = this.oldValue;
+    if (v == curOldValue) return;
+    if (this.offHeapOk) {
+      if (curOldValue instanceof Chunk) {
+        if (SimpleMemoryAllocatorImpl.trackReferenceCounts()) {
+          OffHeapHelper.releaseAndTrackOwner(curOldValue, new OldValueOwner());
+        } else {
+          OffHeapHelper.release(curOldValue);
+        }
+      }
+    }
+    
     this.oldValue = v;
   }
+
+  @Released(ENTRY_EVENT_OLD_VALUE)
+  private void retainAndSetOldValue(@Retained(ENTRY_EVENT_OLD_VALUE) Object v) {
+    if (v == this.oldValue) return;
+    
+    if (v instanceof Chunk) {
+      if (SimpleMemoryAllocatorImpl.trackReferenceCounts()) {
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(new OldValueOwner());
+        boolean couldNotRetain = (!((Chunk) v).retain());
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+        if (couldNotRetain) {
+          this.oldValue = null;
+          return;
+        }
+      } else {
+        if (!((Chunk) v).retain()) {
+          this.oldValue = null;
+          return;
+        }
+      }
+    }
+    basicSetOldValue(v);
+  }
+
+  @Unretained(ENTRY_EVENT_OLD_VALUE)
   private Object basicGetOldValue() {
+    @Unretained(ENTRY_EVENT_OLD_VALUE)
     Object result = this.oldValue;
+    if (!this.offHeapOk && result instanceof Chunk) {
+      //this.region.getCache().getLogger().info("DEBUG old value already freed " + System.identityHashCode(result));
+      throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
+    }
     return result;
   }
 
-  public Object getRawOldValue() {
+  /**
+   * Like getRawOldValue except that if the result is an off-heap reference then copy it to the heap.
+   * To avoid the heap copy use getRawOldValue instead.
+   */
+  public final Object getRawOldValueAsHeapObject() {
+    return OffHeapHelper.getHeapForm(OffHeapHelper.copyIfNeeded(basicGetOldValue()));
+  }
+  /*
+   * If the old value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE). 
+   * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+   * Else return the raw form.
+   */
+  @Unretained
+  public final Object getRawOldValue() {
     return basicGetOldValue();
   }
-
   /**
-   * Get the new value preferring the real value over delta if newValue has
-   * already been calculated. This is to correct size calculations when using
-   * delta (used by SQLFabric). See bug #40866.
+   * Just like getRawOldValue except if the raw old value is off-heap deserialize it.
+   * Note that in some cases sqlf ignores the request to deserialize.
    */
-  public Object getNewValueForSize() {
-    applyDelta(false);
-    return basicGetNewValue();
+  @Unretained(ENTRY_EVENT_OLD_VALUE)
+  public final Object getOldValueAsOffHeapDeserializedOrRaw() {
+    Object result = basicGetOldValue();
+    if (result instanceof StoredObject) {
+      result = ((StoredObject) result).getDeserializedForReading();
+    }
+    return AbstractRegion.handleNotAvailable(result); // fixes 49499
   }
-  
+
   /**
    * Added this function to expose isCopyOnRead function to the
    * child classes of EntryEventImpl  
@@ -792,10 +1073,12 @@ public class EntryEventImpl
    *
    * @return the value in the cache after this event.
    */
-  public Object getNewValue() {
+  public final Object getNewValue() {
+    
     boolean doCopyOnRead = getRegion().isCopyOnRead();
     try {
       if (applyDelta(true)) {
+        @Unretained(ENTRY_EVENT_NEW_VALUE)
         Object applied = basicGetNewValue();
         if (applied == this.oldValue && doCopyOnRead) {
           applied = CopyHelper.copy(applied);
@@ -816,10 +1099,14 @@ public class EntryEventImpl
         // I'm not sure this can even happen
         return AbstractRegion.handleNotAvailable(nv);
       }
+      if (nv instanceof StoredObject) {
+        // TODO OFFHEAP currently we copy offheap new value to the heap here. Check callers of this method to see if they can be optimized to use offheap values.
+        // TODO OFFHEAP: returns off-heap PdxInstance
+        return ((StoredObject) nv).getValueAsDeserializedHeapObject();
+      } else
       if (nv instanceof CachedDeserializable) {
         CachedDeserializable cd = (CachedDeserializable)nv;
         Object v = null;
-        // TODO OFFHEAP currently we copy offheap new value to the heap here. Check callers of this method to see if they can be optimized to use offheap values.
         if (doCopyOnRead) {
           v = cd.getDeserializedWritableCopy(this.region, this.re);
         } else {
@@ -869,7 +1156,7 @@ public class EntryEventImpl
   }
 
   /** Set a deserialized value */
-  public void setNewValue(Object obj) {
+  public final void setNewValue(@Retained(ENTRY_EVENT_NEW_VALUE) Object obj) {
     if (obj instanceof Delta) {
       this.delta = (Delta)obj;
       basicSetNewValue(null);
@@ -879,7 +1166,6 @@ public class EntryEventImpl
     }
   }
 
-
   public TransactionId getTransactionId()
   {
     return this.txId;
@@ -973,21 +1259,27 @@ public class EntryEventImpl
   }
 
   /**
-   * Return the new value iff it is already in serialized form,
-   * otherwise return null
+   * @return null if new value is not serialized; otherwise returns a SerializedCacheValueImpl containing the new value.
    */
   public SerializedCacheValue<?> getSerializedNewValue() {
     // In the case where there is a delta that has not been applied yet,
     // do not apply it here since it would not produce a serialized new
     // value (return null instead to indicate the new value is not
     // in serialized form).
+    @Unretained(ENTRY_EVENT_NEW_VALUE)
     final Object tmp = basicGetNewValue();
     if (tmp instanceof CachedDeserializable) {
+      if (tmp instanceof StoredObject) {
+        if (!((StoredObject) tmp).isSerialized()) {
+          // TODO OFFHEAP can we handle offheap byte[] better?
+          return null;
+        }
+      }
       byte[] bytes = this.newValueBytes;
       if (bytes == null) {
         bytes = this.cachedSerializedNewValue;
       }
-      return new SerializedCacheValueImpl(getRegion(), this.re,
+      return new SerializedCacheValueImpl(this, getRegion(), this.re,
           (CachedDeserializable)tmp, bytes);
     } else {
       // Note we return null even if cachedSerializedNewValue is not null.
@@ -998,6 +1290,298 @@ public class EntryEventImpl
   }
   
   /**
+   * Implement this interface if you want to call {@link #exportNewValue}.
+   * 
+   * @author darrel
+   *
+   */
+  public interface NewValueImporter {
+    /**
+     * @return true if the importer prefers the value to be in serialized form.
+     */
+    boolean prefersNewSerialized();
+
+    /**
+     * Only return true if the importer can use the value before the event that exported it is released.
+     * If false is returned then off-heap values will be copied to the heap for the importer.
+     * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE.
+     */
+    boolean isUnretainedNewReferenceOk();
+
+    /**
+     * Import a new value that is currently in object form.
+     * @param nv the new value to import; unretained if isUnretainedNewReferenceOk returns true
+     * @param isSerialized true if the imported new value represents data that needs to be serialized; false if the imported new value is a simple sequence of bytes.
+     */
+    void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized);
+
+    /**
+     * Import a new value that is currently in byte array form.
+     * @param nv the new value to import
+     * @param isSerialized true if the imported new value represents data that needs to be serialized; false if the imported new value is a simple sequence of bytes.
+     */
+    void importNewBytes(byte[] nv, boolean isSerialized);
+  }
+  
+  /**
+   * Export the event's new value to the given importer.
+   */
+  public final void exportNewValue(NewValueImporter importer) {
+    final boolean prefersSerialized = importer.prefersNewSerialized();
+    if (prefersSerialized) {
+      if (getCachedSerializedNewValue() != null) {
+        importer.importNewBytes(getCachedSerializedNewValue(), true);
+        return;
+      } else {
+      if (this.newValueBytes != null && this.newValue instanceof CachedDeserializable) {
+        importer.importNewBytes(this.newValueBytes, true);
+        return;
+      }
+      }
+    }
+    @Unretained(ENTRY_EVENT_NEW_VALUE) 
+    final Object nv = getRawNewValue();
+    if (nv instanceof StoredObject) {
+      @Unretained(ENTRY_EVENT_NEW_VALUE)
+      final StoredObject so = (StoredObject) nv;
+      final boolean isSerialized = so.isSerialized();
+      if (nv instanceof Chunk) {
+        if (importer.isUnretainedNewReferenceOk()) {
+          importer.importNewObject(nv, isSerialized);
+        } else {
+          if (!isSerialized || prefersSerialized) {
+            byte[] bytes = so.getValueAsHeapByteArray();
+            importer.importNewBytes(bytes, isSerialized);
+            if (isSerialized) {
+              setCachedSerializedNewValue(bytes);
+            }
+          } else {
+            // TODO OFFHEAP: returns off-heap PdxInstance which is not ok since isUnretainedNewReferenceOk returned false
+            importer.importNewObject(so.getValueAsDeserializedHeapObject(), true);
+          }
+        }
+      } else {
+        importer.importNewObject(nv, isSerialized);
+      }
+    } else if (nv instanceof byte[]) {
+      importer.importNewBytes((byte[])nv, false);
+    } else if (nv instanceof CachedDeserializable) {
+      CachedDeserializable cd = (CachedDeserializable) nv;
+      Object cdV = cd.getValue();
+      if (cdV instanceof byte[]) {
+        importer.importNewBytes((byte[]) cdV, true);
+        setCachedSerializedNewValue((byte[]) cdV);
+      } else {
+        importer.importNewObject(cdV, true);
+      }
+    } else {
+      importer.importNewObject(nv, true);
+    }
+  }
+  /**
+   * Implement this interface if you want to call {@link #exportOldValue}.
+   * 
+   * @author darrel
+   *
+   */
+  public interface OldValueImporter {
+    /**
+     * @return true if the importer prefers the value to be in serialized form.
+     */
+    boolean prefersOldSerialized();
+
+    /**
+     * Only return true if the importer can use the value before the event that exported it is released.
+     * @return true if the importer can deal with the value being an unretained OFF_HEAP_REFERENCE.
+     */
+    boolean isUnretainedOldReferenceOk();
+    
+    /**
+     * @return return true if you want the old value to possibly be an instanceof CachedDeserializable; false if you want the value contained in a CachedDeserializable.
+     */
+    boolean isCachedDeserializableValueOk();
+
+    /**
+     * Import an old value that is currently in object form.
+     * @param ov the old value to import; unretained if isUnretainedOldReferenceOk returns true
+     * @param isSerialized true if the imported old value represents data that needs to be serialized; false if the imported old value is a simple sequence of bytes.
+     */
+    void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized);
+
+    /**
+     * Import an old value that is currently in byte array form.
+     * @param ov the old value to import
+     * @param isSerialized true if the imported old value represents data that needs to be serialized; false if the imported old value is a simple sequence of bytes.
+     */
+    void importOldBytes(byte[] ov, boolean isSerialized);
+  }
+  
+  /**
+   * Export the event's old value to the given importer.
+   */
+  public final void exportOldValue(OldValueImporter importer) {
+    final boolean prefersSerialized = importer.prefersOldSerialized();
+    if (prefersSerialized) {
+      if (this.oldValueBytes != null && this.oldValue instanceof CachedDeserializable) {
+        importer.importOldBytes(this.oldValueBytes, true);
+        return;
+      }
+    }
+    @Unretained(ENTRY_EVENT_OLD_VALUE)
+    final Object ov = getRawOldValue();
+    if (ov instanceof StoredObject) {
+      final StoredObject so = (StoredObject) ov;
+      final boolean isSerialized = so.isSerialized();
+      if (ov instanceof Chunk) {
+        if (importer.isUnretainedOldReferenceOk()) {
+          importer.importOldObject(ov, isSerialized);
+        } else {
+          if (!isSerialized || prefersSerialized) {
+            importer.importOldBytes(so.getValueAsHeapByteArray(), isSerialized);
+          } else {
+            // TODO OFFHEAP: returns off-heap PdxInstance which is not ok since isUnretainedNewReferenceOk returned false
+           importer.importOldObject(so.getValueAsDeserializedHeapObject(), true);
+          }
+        }
+      } else {
+        importer.importOldObject(ov, isSerialized);
+      }
+    } else if (ov instanceof byte[]) {
+      importer.importOldBytes((byte[])ov, false);
+    } else if (!importer.isCachedDeserializableValueOk() && ov instanceof CachedDeserializable) {
+      CachedDeserializable cd = (CachedDeserializable) ov;
+      Object cdV = cd.getValue();
+      if (cdV instanceof byte[]) {
+        importer.importOldBytes((byte[]) cdV, true);
+      } else {
+        importer.importOldObject(cdV, true);
+      }
+    } else {
+      importer.importOldObject(ov, true);
+    }
+  }
+
+  /**
+   * If applyDelta is true then first attempt to apply a delta (if we have one) and return the value.
+   * Else if new value is a Delta return it.
+   * Else if new value is off-heap return the StoredObject form (unretained OFF_HEAP_REFERENCE). 
+   * Its refcount is not inced by this call and the returned object can only be safely used for the lifetime of the EntryEventImpl instance that returned the value.
+   * Else return the raw form.
+   */
+  @Unretained(ENTRY_EVENT_NEW_VALUE)
+  public final Object getRawNewValue(boolean applyDelta) {
+    if (applyDelta) {
+      boolean doCopyOnRead = getRegion().isCopyOnRead();
+      Object newValueWithDelta = applyDeltaWithCopyOnRead(doCopyOnRead);
+      if (newValueWithDelta != null) {
+        return newValueWithDelta;
+      }
+      // if applyDelta is true and we have already applied the delta then
+      // just return the applied value instead of the delta object.
+      @Unretained(ENTRY_EVENT_NEW_VALUE)
+      Object newValue = basicGetNewValue();
+      if (newValue != null) return newValue;
+    }
+    return getRawNewValue();
+  }
+  /**
+   * Just like getRawNewValue(true) except if the raw new value is off-heap deserialize it.
+   * Note that in some cases sqlf ignores the request to deserialize.
+   */
+  @Unretained(ENTRY_EVENT_NEW_VALUE)
+  public final Object getNewValueAsOffHeapDeserializedOrRaw() {
+    Object result = getRawNewValue(true);
+    if (result instanceof StoredObject) {
+      result = ((StoredObject) result).getDeserializedForReading();
+    }
+    return AbstractRegion.handleNotAvailable(result); // fixes 49499
+  }
+
+  /**
+   * If the new value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release).
+   * @return a retained OFF_HEAP_REFERENCE if the new value is off-heap; otherwise returns null
+   */
+  @Retained(ENTRY_EVENT_NEW_VALUE)
+  public StoredObject getOffHeapNewValue() {
+    final Object tmp = basicGetNewValue();
+    if (tmp instanceof StoredObject) {
+      StoredObject result = (StoredObject) tmp;
+      if (!result.retain()) {
+        return null;
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+  
+  /**
+   * If the old value is stored off-heap return a retained OFF_HEAP_REFERENCE (caller must release).
+   * @return a retained OFF_HEAP_REFERENCE if the old value is off-heap; otherwise returns null
+   */
+  @Retained(ENTRY_EVENT_OLD_VALUE)
+  public StoredObject getOffHeapOldValue() {
+    final Object tmp = basicGetOldValue();
+    if (tmp instanceof StoredObject) {
+      StoredObject result = (StoredObject) tmp;
+      if (!result.retain()) {
+        return null;
+      }
+      return result;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Result may be unretained because sqlf getDeserializedForReading returns unretained.
+   */
+  public final Object getDeserializedValue() {
+    if (this.delta == null) {
+      final Object val = basicGetNewValue();
+      if (val instanceof StoredObject) {
+        // TODO OFFHEAP: returns off-heap PdxInstance
+        return ((StoredObject) val).getValueAsDeserializedHeapObject();
+      } else 
+      if (val instanceof CachedDeserializable) {
+        return ((CachedDeserializable)val).getDeserializedForReading();
+      }
+      else {
+        return val;
+      }
+    }
+    else {
+      return this.delta;
+    }
+  }
+
+  public final byte[] getSerializedValue() {
+    if (this.newValueBytes == null) {
+      final Object val;
+      if (this.delta == null) {
+        val = basicGetNewValue();
+        if (val instanceof byte[]) {
+          return (byte[])val;
+        }
+        else if (val instanceof CachedDeserializable) {
+          return ((CachedDeserializable)val).getSerializedValue();
+        }
+      }
+      else {
+        val = this.delta;
+      }
+      try {
+        return CacheServerHelper.serialize(val);
+      } catch (IOException ioe) {
+        throw new GemFireIOException("unexpected exception", ioe);
+      }
+    }
+    else {
+      return this.newValueBytes;
+    }
+  }
+
+  /**
    * Forces this entry's new value to be in serialized form.
    * @since 5.0.2
    */
@@ -1008,8 +1592,8 @@ public class EntryEventImpl
   /**
    * @param isSynced true if RegionEntry currently under synchronization
    */
-  private void makeSerializedNewValue(boolean isSynced) {
-    Object obj = this.newValue;
+  private final void makeSerializedNewValue(boolean isSynced) {
+    Object obj = basicGetNewValue();
 
     // ezoerner:20080611 In the case where there is an unapplied
     // delta, do not apply the delta or serialize yet unless entry is
@@ -1073,15 +1657,40 @@ public class EntryEventImpl
     return this.cachedSerializedNewValue;
   }
 
-  public void setSerializedNewValue(byte[] serializedValue) {
+  public final void setSerializedNewValue(byte[] serializedValue) {
+    Object newVal = null;
+    if (serializedValue != null) {
+      if (CachedDeserializableFactory.preferObject()) {
+        newVal = deserialize(serializedValue);
+      } else {
+        newVal = CachedDeserializableFactory.create(serializedValue);
+      }
+      if (newVal instanceof Delta) {
+        this.delta = (Delta)newVal;
+        newVal = null;
+        // We need the newValueBytes field and the newValue field to be in sync.
+        // In the case of non-null delta set both fields to null.
+        serializedValue = null;
+      }
+    }
     this.newValueBytes = serializedValue;
-    basicSetNewValue(CachedDeserializableFactory.create(serializedValue));
+    basicSetNewValue(newVal);
     this.cachedSerializedNewValue = serializedValue;
   }
 
   public void setSerializedOldValue(byte[] serializedOldValue){
     this.oldValueBytes = serializedOldValue;
-    basicSetOldValue(CachedDeserializableFactory.create(serializedOldValue), true);
+    final Object ov;
+    if (CachedDeserializableFactory.preferObject()) {
+      ov = deserialize(serializedOldValue);
+    }
+    else if (serializedOldValue != null) {
+      ov = CachedDeserializableFactory.create(serializedOldValue);
+    }
+    else {
+      ov = null;
+    }
+    retainAndSetOldValue(ov);
   }
 
   /**
@@ -1111,12 +1720,29 @@ public class EntryEventImpl
         if (requireOldValue ||
             EVENT_OLD_VALUE
             || this.region instanceof HARegion // fix for bug 37909
+            || GemFireCacheImpl.sqlfSystem()
             ) {
-          Object ov = reentry._getValueUse(owner, true); // TODO:KIRK:OK 
+          @Retained Object ov;
+          if (SimpleMemoryAllocatorImpl.trackReferenceCounts()) {
+            SimpleMemoryAllocatorImpl.setReferenceCountOwner(new OldValueOwner());
+            if (GemFireCacheImpl.sqlfSystem()) {
+              ov = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
+            } else {
+              ov = reentry._getValueRetain(owner, true);
+            }
+            SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+          } else {
+            if (GemFireCacheImpl.sqlfSystem()) {
+              ov = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
+            } else {
+              ov = reentry._getValueRetain(owner, true);
+            }
+          }
           if (ov == null) ov = Token.NOT_AVAILABLE;
-          basicSetOldValue(ov, false);
+          // ov has already been retained so call basicSetOldValue instead of retainAndSetOldValue
+          basicSetOldValue(ov);
         } else {
-          basicSetOldValue(Token.NOT_AVAILABLE, true);
+          basicSetOldValue(Token.NOT_AVAILABLE);
         }
       }
     }
@@ -1159,7 +1785,7 @@ public class EntryEventImpl
   void putNewEntry(final LocalRegion owner, final RegionEntry reentry)
       throws RegionClearedException {
     if (!this.op.guaranteesOldValue()) {  // preserves oldValue for CM ops in clients
-      basicSetOldValue(null, true);
+      basicSetOldValue(null);
     }
     makeCreate();
     setNewValueInRegion(owner, reentry, null);
@@ -1173,6 +1799,7 @@ public class EntryEventImpl
     return this.re;
   }
 
+  @Retained(ENTRY_EVENT_NEW_VALUE)
   private void setNewValueInRegion(final LocalRegion owner,
       final RegionEntry reentry, Object oldValueForDelta) throws RegionClearedException {
     
@@ -1222,10 +1849,23 @@ public class EntryEventImpl
       }
       v = CachedDeserializableFactory.create(v, vSize);
       basicSetNewValue(v);
-    } 
- 
-    v = AbstractRegionMap.prepareValueForCache(this.region, v, this);
+    }
 
+    Object preparedV = reentry.prepareValueForCache(this.region, v, this, this.hasDelta());
+    if (preparedV != v) {
+      v = preparedV;
+      if (v instanceof Chunk) {
+        if (!((Chunk) v).isCompressed()) { // fix bug 52109
+          // If we put it off heap and it is not compressed then remember that value.
+          // Otherwise we want to remember the decompressed value in the event.
+          basicSetNewValue(v);
+        }
+      }
+    }
+    boolean isTombstone = (v == Token.TOMBSTONE);
+    boolean success = false;
+    boolean calledSetValue = false;
+    try {
     setNewValueBucketSize(owner, v);
     
     // ezoerner:20081030 
@@ -1237,8 +1877,6 @@ public class EntryEventImpl
     // for the purpose of index maintenance since invalid entries are not
     // indexed.
     
-    boolean isTombstone = (v == Token.TOMBSTONE);
-    
     if ((this.op.isUpdate() && !reentry.isInvalid()) || this.op.isInvalidate()) {
       IndexManager idxManager = IndexUtils.getIndexManager(this.region, false);
       if (idxManager != null) {
@@ -1256,7 +1894,6 @@ public class EntryEventImpl
     }
     final IndexUpdater indexUpdater = this.region.getIndexUpdater();
     if (indexUpdater != null) {
-      boolean success = false;
       final LocalRegion indexRegion;
       if (owner != null) {
         indexRegion = owner;
@@ -1266,6 +1903,7 @@ public class EntryEventImpl
       }
       try {
         indexUpdater.onEvent(indexRegion, this, reentry);
+        calledSetValue = true;
         reentry.setValueWithTombstoneCheck(v, this); // already called prepareValueForCache
         success = true;
       } finally {
@@ -1273,7 +1911,14 @@ public class EntryEventImpl
       }
     }
     else {
+      calledSetValue = true;
       reentry.setValueWithTombstoneCheck(v, this); // already called prepareValueForCache
+      success = true;
+    }
+    } finally {
+      if (!success && reentry instanceof OffHeapRegionEntry && v instanceof Chunk) {
+        OffHeapRegionEntryHelper.releaseEntry((OffHeapRegionEntry)reentry, (Chunk)v);
+      }      
     }
     if (logger.isTraceEnabled()) {
       if (v instanceof CachedDeserializable) {
@@ -1384,20 +2029,19 @@ public class EntryEventImpl
     }
   }
 
-  void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable)
-  {
-    if (Token.isInvalidOrRemoved(oldVal) || oldVal == null) {
-      basicSetOldValue(null, true);
+  void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) {
+    if (Token.isInvalidOrRemoved(oldVal)) {
+      oldVal = null;
     }
     else {
-      if (mustBeAvailable) {
-        basicSetOldValue(oldVal, true);
-      } else if (EVENT_OLD_VALUE) {
-        basicSetOldValue(oldVal, true);
-      } else {
-        basicSetOldValue(Token.NOT_AVAILABLE, true);
+      if (mustBeAvailable || oldVal == null || EVENT_OLD_VALUE) {
+        // set oldValue to oldVal
+      }
+      else {
+        oldVal = Token.NOT_AVAILABLE;
       }
     }
+    retainAndSetOldValue(oldVal);
   }
 
   void putValueTXEntry(final TXEntryState tx) {
@@ -1418,7 +2062,7 @@ public class EntryEventImpl
     if (this.op != Operation.LOCAL_INVALIDATE
         && this.op != Operation.LOCAL_DESTROY) {
       // fix for bug 34387
-      tx.setPendingValue(v);
+      tx.setPendingValue(OffHeapHelper.copyIfNeeded(v)); // TODO OFFHEAP optimize
     }
     tx.setCallbackArgument(getCallbackArgument());
   }
@@ -1429,8 +2073,14 @@ public class EntryEventImpl
     try {
       RegionEntry re = this.region.getRegionEntry(getKey());
       if (re == null) return false;
-      Object v = re._getValueUse(this.region, true);
+      SimpleMemoryAllocatorImpl.skipRefCountTracking();
+      Object v = re._getValueRetain(this.region, true);
+      SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+      try {
         return setOldValue(v);
+      } finally {
+        OffHeapHelper.releaseWithNoTracking(v);
+      }
     }
     catch (EntryNotFoundException ex) {
       return false;
@@ -1445,7 +2095,7 @@ public class EntryEventImpl
 
   void setOldValueDestroyedToken()
   {
-    basicSetOldValue(Token.DESTROYED, true);
+    basicSetOldValue(Token.DESTROYED);
   }
 
   /**
@@ -1468,20 +2118,21 @@ public class EntryEventImpl
     }
     else {
       if (Token.isInvalid(v)) {
-        basicSetOldValue(null, true);
+        v = null;
       }
       else {
         if (force ||
             (this.region instanceof HARegion) // fix for bug 37909
             ) {
-          basicSetOldValue(v, true);
+          // set oldValue to "v".
         } else if (EVENT_OLD_VALUE) {
           // TODO Rusty add compression support here
-          basicSetOldValue(v, true);
+          // set oldValue to "v".
         } else {
-          basicSetOldValue(Token.NOT_AVAILABLE, true);
+          v = Token.NOT_AVAILABLE;
         }
       }
+      retainAndSetOldValue(v);
       return true;
     }
   }
@@ -1493,17 +2144,18 @@ public class EntryEventImpl
   public void setConcurrentMapOldValue(Object v) {
     if (Token.isRemoved(v)) {
       return;
-    } else if (Token.isInvalid(v)) {
-      basicSetOldValue(null, true);
     } else {
-      basicSetOldValue(v, true);
+      if (Token.isInvalid(v)) {
+        v = null;
+      }   
+      retainAndSetOldValue(v);
     }
   }
 
   /** Return true if new value available */
   public boolean hasNewValue() {
     Object tmp = this.newValue;
-    if (tmp == null && this.delta != null) {
+    if (tmp == null && hasDelta()) {
       // ???:ezoerner:20080611 what if applying the delta would produce
       // null or (strangely) NOT_AVAILABLE.. do we need to apply it here to
       // find out?
@@ -1512,8 +2164,11 @@ public class EntryEventImpl
     return  tmp != null && tmp != Token.NOT_AVAILABLE;
   }
 
-  final public boolean hasOldValue() {
-    return this.oldValue != null  && basicGetOldValue() != Token.NOT_AVAILABLE;
+  public final boolean hasOldValue() {
+    return this.oldValue != null && this.oldValue != Token.NOT_AVAILABLE;
+  }
+  public final boolean isOldValueAToken() {
+    return this.oldValue instanceof Token;
   }
 
   /**
@@ -1535,7 +2190,7 @@ public class EntryEventImpl
   }
   
   public void oldValueNotAvailable() {
-    basicSetOldValue(Token.NOT_AVAILABLE, true);
+    basicSetOldValue(Token.NOT_AVAILABLE);
   }
 
   public static Object deserialize(byte[] bytes) {
@@ -1559,6 +2214,25 @@ public class EntryEventImpl
   }
 
   /**
+   * If a PdxInstance is returned then it will have an unretained reference
+   * to Chunk's off-heap address.
+   */
+  public static @Unretained Object deserializeChunk(Chunk bytes) {
+    if (bytes == null)
+      return null;
+    try {
+      return BlobHelper.deserializeOffHeapBlob(bytes);
+    }
+    catch (IOException e) {
+      throw new SerializationException(LocalizedStrings.EntryEventImpl_AN_IOEXCEPTION_WAS_THROWN_WHILE_DESERIALIZING.toLocalizedString(), e);
+    }
+    catch (ClassNotFoundException e) {
+      // fix for bug 43602
+      throw new SerializationException(LocalizedStrings.EntryEventImpl_A_CLASSNOTFOUNDEXCEPTION_WAS_THROWN_WHILE_TRYING_TO_DESERIALIZE_CACHED_VALUE.toLocalizedString(), e);
+    }
+  }
+
+  /**
    * Serialize an object into a <code>byte[]</code>
    *
    * @throws IllegalArgumentException
@@ -1644,9 +2318,17 @@ public class EntryEventImpl
     buf.append(";key=");
     buf.append(this.getKey());
     buf.append(";oldValue=");
-    buf.append(basicGetOldValue());
+    try {
+      ArrayUtils.objectStringNonRecursive(basicGetOldValue(), buf);
+    } catch (IllegalStateException ex) {
+      buf.append("OFFHEAP_VALUE_FREED");
+    }
     buf.append(";newValue=");
-    buf.append(basicGetNewValue());
+    try {
+      ArrayUtils.objectStringNonRecursive(basicGetNewValue(), buf);
+    } catch (IllegalStateException ex) {
+      buf.append("OFFHEAP_VALUE_FREED");
+    }
     buf.append(";callbackArg=");
     buf.append(this.getRawCallbackArgument());
     buf.append(";originRemote=");
@@ -1720,6 +2402,11 @@ public class EntryEventImpl
       else {
         Object nv = basicGetNewValue();
         boolean newValueSerialized = nv instanceof CachedDeserializable;
+        if (newValueSerialized) {
+          if (nv instanceof StoredObject) {
+            newValueSerialized = ((StoredObject) nv).isSerialized();
+          }
+        }
         out.writeBoolean(newValueSerialized);
         if (newValueSerialized) {
           if (this.newValueBytes != null) {
@@ -1740,6 +2427,11 @@ public class EntryEventImpl
     {
       Object ov = basicGetOldValue();
       boolean oldValueSerialized = ov instanceof CachedDeserializable;
+      if (oldValueSerialized) {
+        if (ov instanceof StoredObject) {
+          oldValueSerialized = ((StoredObject) ov).isSerialized();
+        }
+      }
       out.writeBoolean(oldValueSerialized);
       if (oldValueSerialized) {
         if (this.oldValueBytes != null) {
@@ -1803,17 +2495,27 @@ public class EntryEventImpl
     }
   }
 
-  public SerializedCacheValue<?> getSerializedOldValue() {
+  /**
+   * @return null if old value is not serialized; otherwise returns a SerializedCacheValueImpl containing the old value.
+   */
+  public final SerializedCacheValue<?> getSerializedOldValue() {
+    @Unretained(ENTRY_EVENT_OLD_VALUE)
     final Object tmp = basicGetOldValue();
     if (tmp instanceof CachedDeserializable) {
-      return new SerializedCacheValueImpl(this.region, this.re,
+      if (tmp instanceof StoredObject) {
+        if (!((StoredObject) tmp).isSerialized()) {
+          // TODO OFFHEAP can we handle offheap byte[] better?
+          return null;
+        }
+      }
+      return new SerializedCacheValueImpl(this, this.region, this.re,
           (CachedDeserializable)tmp, this.oldValueBytes);
     }
     else {
       return null;
     }
   }
-
+  
   /**
    * Compute an estimate of the size of the new value
    * for a PR. Since PR's always store values in a cached deserializable
@@ -1937,6 +2639,22 @@ public class EntryEventImpl
   protected Long tailKey = -1L;
 
   /**
+   * Used to store next region version generated for a change on this entry
+   * by phase-1 commit on the primary.  
+   * 
+   * Not to be used in fromData and toData
+   */
+  protected transient long nextRegionVersion = -1L;
+  
+  public void setNextRegionVersion(long regionVersion) {
+    this.nextRegionVersion = regionVersion;
+  }
+  
+  public long getNextRegionVersion() {
+    return this.nextRegionVersion;
+  }
+  
+  /**
    * Return true if this event came from a server by the client doing a get.
    * @since 5.7
    */
@@ -2076,9 +2794,11 @@ public class EntryEventImpl
   public void setOldValueForQueryProcessing() {
     RegionEntry reentry = this.region.entries.getEntry(this.getKey());
     if (reentry != null) {
-      Object v = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
+      @Retained Object v = reentry.getValueOffHeapOrDiskWithoutFaultIn(this.region);
       if ( !(v instanceof Token) ) {
-        basicSetOldValue(v, false);
+        // v has already been retained.
+        basicSetOldValue(v);
+        // this event now owns the retention of v.
       }
     }
   }
@@ -2129,24 +2849,21 @@ public class EntryEventImpl
     return result;
   }
 
-
-  // TODO this class is trouble for OffHeap storage.
-  // If the cd is a OffHeapCachedDeserializable we don't want to have a reference
-  // to it in this class since customers can hold onto SerializedCacheValue
-  // as long as they want and we would need to use finalization to know when
-  // they are done with it.
-  // TODO this class is also used in a number of places internally and those
-  // we want to be optimized. I think those places can be changed not to use
-  // this class.
-  public static class SerializedCacheValueImpl
+  public static final class SerializedCacheValueImpl
     implements SerializedCacheValue, CachedDeserializable, Sendable
   {
-    private final CachedDeserializable cd;
+    private final EntryEventImpl event;
+    @Unretained private final CachedDeserializable cd;
     private final Region r;
     private final RegionEntry re;
     private final byte[] serializedValue;
     
-    SerializedCacheValueImpl(Region r, RegionEntry re, CachedDeserializable cd, byte[] serializedBytes) {
+    SerializedCacheValueImpl(EntryEventImpl event, Region r, RegionEntry re, @Unretained CachedDeserializable cd, byte[] serializedBytes) {
+      if (cd instanceof Chunk) {
+        this.event = event;
+      } else {
+        this.event = null;
+      }
       this.r = r;
       this.re = re;
       this.cd = cd;
@@ -2157,33 +2874,43 @@ public class EntryEventImpl
       if(this.serializedValue != null){
         return this.serializedValue;
       }
-      return this.cd.getSerializedValue();
+      return getCd().getSerializedValue();
+    }
+    
+    private CachedDeserializable getCd() {
+      if (this.event != null && !this.event.offHeapOk) {
+        throw new IllegalStateException("Attempt to access off heap value after the EntryEvent was released.");
+      }
+      return this.cd;
     }
     
     public Object getDeserializedValue() {
       return getDeserializedValue(this.r, this.re);
     }
     public Object getDeserializedForReading() {
-      return this.cd.getDeserializedForReading();
+      // TODO OFFHEAP: returns off-heap PdxInstance
+      return OffHeapHelper.getHeapForm(getCd().getDeserializedForReading());
     }
     public Object getDeserializedWritableCopy(Region rgn, RegionEntry entry) {
-      return this.cd.getDeserializedWritableCopy(rgn, entry);
+      // TODO OFFHEAP: returns off-heap PdxInstance
+      return OffHeapHelper.getHeapForm(getCd().getDeserializedWritableCopy(rgn, entry));
     }
 
     public Object getDeserializedValue(Region rgn, RegionEntry reentry) {
-      return this.cd.getDeserializedValue(rgn, reentry);
+      // TODO OFFHEAP: returns off-heap PdxInstance
+      return OffHeapHelper.getHeapForm(getCd().getDeserializedValue(rgn, reentry));
     }
     public Object getValue() {
       if(this.serializedValue != null){
         return this.serializedValue;
       }
-      return this.cd.getValue();
+      return getCd().getValue();
     }
     public void writeValueAsByteArray(DataOutput out) throws IOException {
       if (this.serializedValue != null) {
         DataSerializer.writeByteArray(this.serializedValue, out);
       } else {
-        this.cd.writeValueAsByteArray(out);
+        getCd().writeValueAsByteArray(out);
       }
     }
     public void fillSerializedValue(BytesAndBitsForCompactor wrapper, byte userBits) {
@@ -2191,23 +2918,23 @@ public class EntryEventImpl
         wrapper.setData(this.serializedValue, userBits, this.serializedValue.length, 
                         false /* Not Reusable as it refers to underlying value */);
       } else {
-        this.cd.fillSerializedValue(wrapper, userBits);
+        getCd().fillSerializedValue(wrapper, userBits);
       }
     }
     public int getValueSizeInBytes() {
-      return this.cd.getValueSizeInBytes();
+      return getCd().getValueSizeInBytes();
     }
     public int getSizeInBytes() {
-      return this.cd.getSizeInBytes();
+      return getCd().getSizeInBytes();
     }
 
     public String getStringForm() {
-      return this.cd.getStringForm();
+      return getCd().getStringForm();
     }
 
     @Override
     public void sendTo(DataOutput out) throws IOException {
-      DataSerializer.writeObject(this.cd, out);
+      DataSerializer.writeObject(getCd(), out);
     }
   }
 //////////////////////////////////////////////////////////////////////////////////////////
@@ -2282,4 +3009,112 @@ public class EntryEventImpl
   public boolean isSingleHopPutOp() {
     return (this.causedByMessage != null && this.causedByMessage instanceof RemotePutMessage);
   }
+
+  /**
+   * True if it is ok to use old/new values that are stored off heap.
+   * False if an exception should be thrown if an attempt is made to access old/new offheap values.
+   */
+  private transient boolean offHeapOk = true;
+ 
+  @Override
+  @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
+  public void release() {
+    // noop if already freed or values can not be off-heap
+    if (!this.offHeapOk) return;
+    // Note that this method does not set the old/new values to null but
+    // leaves them set to the off-heap value so that future calls to getOld/NewValue
+    // will fail with an exception.
+//    LocalRegion lr = getLocalRegion();
+//    if (lr != null) {
+//      if (lr.isCacheClosing()) {
+//        // to fix races during closing and recreating cache (see bug 47883) don't bother
+//        // trying to decrement reference counts if we are closing the cache.
+//        // TODO OFFHEAP: this will cause problems once offheap lives longer than a cache.
+//        this.offHeapOk = false;
+//        return;
+//      }
+//    }
+    Object ov = basicGetOldValue();
+    Object nv = basicGetNewValue();
+    this.offHeapOk = false;
+    
+    if (ov instanceof Chunk) {
+      //this.region.getCache().getLogger().info("DEBUG freeing ref to old value on " + System.identityHashCode(ov));
+      if (SimpleMemoryAllocatorImpl.trackReferenceCounts()) {
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(new OldValueOwner());
+        ((Chunk) ov).release();
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+      } else {
+        ((Chunk) ov).release();
+      }
+    }
+    OffHeapHelper.releaseAndTrackOwner(nv, this);
+  }
+
+  /**
+   * Make sure that this event will never own an off-heap value.
+   * Once this is called on an event it does not need to have release called.
+   */
+  public void disallowOffHeapValues() {
+    if (this.newValue instanceof Chunk || this.oldValue instanceof Chunk) {
+      throw new IllegalStateException("This event does not support off-heap values");
+    }
+    this.offHeapOk = false;
+  }
+  
+  /**
+   * This copies the off-heap new and/or old value to the heap.
+   * As a result the current off-heap new/old will be released.
+   * @throws IllegalStateException if called with an event for sqlf data.
+   */
+  @Released({ENTRY_EVENT_NEW_VALUE, ENTRY_EVENT_OLD_VALUE})
+  public void copyOffHeapToHeap() {
+    Object ov = basicGetOldValue();
+    if (ov instanceof Chunk) {
+      if (SimpleMemoryAllocatorImpl.trackReferenceCounts()) {
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(new OldValueOwner());
+        this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+      } else {
+        this.oldValue = OffHeapHelper.copyAndReleaseIfNeeded(ov);
+      }
+    }
+    Object nv = basicGetNewValue();
+    if (nv instanceof Chunk) {
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(this);
+      this.newValue = OffHeapHelper.copyAndReleaseIfNeeded(nv);
+      SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
+    }
+    if (this.newValue instanceof Chunk || this.oldValue instanceof Chunk) {
+      throw new IllegalStateException("event's old/new value still off-heap after calling copyOffHeapToHeap");
+    }
+    this.offHeapOk = false;
+  }
+
+  public boolean isOldValueOffHeap() {
+    return this.oldValue instanceof Chunk;
+  }
+  public final boolean isFetchFromHDFS() {
+    return fetchFromHDFS;
+  }
+
+  public final void setFetchFromHDFS(boolean fetchFromHDFS) {
+    this.fetchFromHDFS = fetchFromHDFS;
+  }
+
+  public final boolean isPutDML() {
+    return this.isPutDML;
+  }
+
+  public final void setPutDML(boolean val) {
+    this.isPutDML = val;
+  }
+
+  public final boolean isLoadedFromHDFS() {
+    return loadedFromHDFS;
+  }
+
+  public final void setLoadedFromHDFS(boolean loadedFromHDFS) {
+    this.loadedFromHDFS = loadedFromHDFS;
+  }
 }


Mime
View raw message