geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [13/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:14 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 5c7c7bc..494efaf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -8,6 +8,7 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Collection;
@@ -25,6 +26,7 @@ import com.gemstone.gemfire.InvalidDeltaException;
 import com.gemstone.gemfire.cache.CacheRuntimeException;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.DiskAccessException;
 import com.gemstone.gemfire.cache.EntryExistsException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -56,10 +58,20 @@ import com.gemstone.gemfire.internal.cache.versions.VersionHolder;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter;
+import com.gemstone.gemfire.internal.concurrent.MapResult;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
 import com.gemstone.gemfire.pdx.PdxInstance;
@@ -92,6 +104,12 @@ abstract class AbstractRegionMap implements RegionMap {
   /** An internal Listener for index maintenance for SQLFabric. */
   private final IndexUpdater indexUpdater;
 
+  /**
+   * This test hook is used to force the conditions for defect 48182.
+   * This hook is used by Bug48182JUnitTest.
+   */
+  static Runnable testHookRunnableFor48182 =  null;
+  
   private RegionEntryFactory entryFactory;
   private Attributes attr;
   private transient Object owner; // the region that owns this map
@@ -122,13 +140,16 @@ abstract class AbstractRegionMap implements RegionMap {
     final GemFireCacheImpl cache;
     boolean isDisk;
     boolean withVersioning = false;
+    boolean offHeap = false;
     if (owner instanceof LocalRegion) {
       LocalRegion region = (LocalRegion)owner;
       isDisk = region.getDiskRegion() != null;
       cache = region.getGemFireCache();
       withVersioning = region.getConcurrencyChecksEnabled();
+      offHeap = region.getOffHeap();
     }
     else if (owner instanceof PlaceHolderDiskRegion) {
+      offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap();
       isDisk = true;
       withVersioning = ((PlaceHolderDiskRegion)owner).getFlags().contains(
           DiskRegionFlag.IS_WITH_VERSIONING);
@@ -168,21 +189,29 @@ abstract class AbstractRegionMap implements RegionMap {
         if (isLRU) {
           if (isDisk) {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedStatsDiskLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedStatsDiskLRURegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMStatsDiskLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMStatsDiskLRURegionEntryHeap.getEntryFactory();
               }
             }
           } else {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedStatsLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedStatsLRURegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMStatsLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMStatsLRURegionEntryHeap.getEntryFactory();
               }
             }
@@ -190,21 +219,29 @@ abstract class AbstractRegionMap implements RegionMap {
         } else { // !isLRU
           if (isDisk) {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedStatsDiskRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedStatsDiskRegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMStatsDiskRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMStatsDiskRegionEntryHeap.getEntryFactory();
               }
             }
           } else {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedStatsRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedStatsRegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMStatsRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMStatsRegionEntryHeap.getEntryFactory();
               }
             }
@@ -215,22 +252,30 @@ abstract class AbstractRegionMap implements RegionMap {
         if (isLRU) {
           if (isDisk) {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedThinDiskLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedThinDiskLRURegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMThinDiskLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMThinDiskLRURegionEntryHeap.getEntryFactory();
               }
             }
           }
           else {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedThinLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedThinLRURegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMThinLRURegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMThinLRURegionEntryHeap.getEntryFactory();
               }
             }
@@ -239,22 +284,30 @@ abstract class AbstractRegionMap implements RegionMap {
         else { // !isLRU
           if (isDisk) {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedThinDiskRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedThinDiskRegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMThinDiskRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMThinDiskRegionEntryHeap.getEntryFactory();
               }
             }
           }
           else {
             if (withVersioning) {
-              {
+              if (offHeap) {
+                factory = VersionedThinRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VersionedThinRegionEntryHeap.getEntryFactory();
               }
             } else {
-              {
+              if (offHeap) {
+                factory = VMThinRegionEntryOffHeap.getEntryFactory();
+              } else {
                 factory = VMThinRegionEntryHeap.getEntryFactory();
               }
             }
@@ -274,12 +327,15 @@ abstract class AbstractRegionMap implements RegionMap {
           concurrencyLevel, isIdentityMap, entryCreator);
     }
     else {
-      return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor,
-          concurrencyLevel, isIdentityMap);
+      return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity,
+          loadFactor, concurrencyLevel, isIdentityMap);
     }
   }
 
   public void changeOwner(LocalRegion r) {
+    if (r == _getOwnerObject()) {
+      return;
+    }
     setOwner(r);
   }
 
@@ -332,7 +388,7 @@ abstract class AbstractRegionMap implements RegionMap {
   // this is currently used by stats and eviction
   @Override
   public int sizeInVM() {
-    return size();
+    return _getMap().size();
   }
 
   public boolean isEmpty()
@@ -350,8 +406,13 @@ abstract class AbstractRegionMap implements RegionMap {
     return (Collection)_getMap().values();
   }
 
-  public final boolean containsKey(Object key)
-  {
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public Collection<RegionEntry> regionEntriesInVM() {
+    return (Collection)_getMap().values();
+  }
+
+  public final boolean containsKey(Object key) {
     RegionEntry re = getEntry(key);
     if (re == null) {
       return false;
@@ -362,23 +423,52 @@ abstract class AbstractRegionMap implements RegionMap {
     return true;
   }
 
-  public RegionEntry getEntry(Object key)
-  {
-    return (RegionEntry)_getMap().get(key);
+  public RegionEntry getEntry(Object key) {
+    RegionEntry re = (RegionEntry)_getMap().get(key);
+    if (re != null && re.isMarkedForEviction()) {
+      // entry has been faulted in from HDFS
+      return null;
+    }
+    return re;
+  }
+
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    return getEntry(event.getKey());
   }
 
+
   @Override
   public final RegionEntry getEntryInVM(Object key) {
     return (RegionEntry)_getMap().get(key);
   }
 
-  private final RegionEntry putEntryIfAbsent(Object key, RegionEntry re)
-  {
-    return (RegionEntry)_getMap().putIfAbsent(key, re);
+
+  public final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
+    RegionEntry value = (RegionEntry)_getMap().putIfAbsent(key, re);
+    if (value == null && (re instanceof OffHeapRegionEntry) 
+        && _isOwnerALocalRegion() && _getOwner().isThisRegionBeingClosedOrDestroyed()) {
+      // prevent orphan during concurrent destroy (#48068)
+      if (_getMap().remove(key, re)) {
+        ((OffHeapRegionEntry)re).release();
+      }
+      _getOwner().checkReadiness(); // throw RegionDestroyedException
+    }
+    return value;
+  }
+
+  @Override
+  public final RegionEntry getOperationalEntryInVM(Object key) {
+    RegionEntry re = (RegionEntry)_getMap().get(key);
+    if (re != null && re.isMarkedForEviction()) {
+      // entry has been faulted in from HDFS
+      return null;
+    }
+    return re;
   }
+ 
 
   public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
-    if (re.isTombstone() && _getMap().get(key) == re) {
+    if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
@@ -394,7 +484,7 @@ abstract class AbstractRegionMap implements RegionMap {
       EntryEventImpl event, final LocalRegion owner,
       final IndexUpdater indexUpdater) {
     boolean success = false;
-    if (re.isTombstone()) {
+    if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
@@ -402,6 +492,19 @@ abstract class AbstractRegionMap implements RegionMap {
       if (indexUpdater != null) {
         indexUpdater.onEvent(owner, event, re);
       }
+
+      //This is messy, but custom eviction calls removeEntry
+      //rather than re.destroy I think to avoid firing callbacks, etc.
+      //However, the value still needs to be set to removePhase1
+      //in order to remove the entry from disk.
+      if(event.isCustomEviction() && !re.isRemoved()) {
+        try {
+          re.removePhase1(owner, false);
+        } catch (RegionClearedException e) {
+          //that's ok, we were just trying to do evict incoming eviction
+        }
+      }
+      
       if (_getMap().remove(key, re)) {
         re.removePhase2();
         success = true;
@@ -416,7 +519,7 @@ abstract class AbstractRegionMap implements RegionMap {
     }
   }
 
-  private final void incEntryCount(int delta) {
+  protected final void incEntryCount(int delta) {
     LocalRegion lr = _getOwner();
     if (lr != null) {
       CachePerfStats stats = lr.getCachePerfStats();
@@ -439,6 +542,17 @@ abstract class AbstractRegionMap implements RegionMap {
     _getMap().clear();
   }
   
+  public void close() {
+    /*
+    for (SuspectEntryList l: this.suspectEntries.values()) {
+      for (EntryEventImpl e: l) {
+        e.release();
+      }
+    }
+    */
+    clear(null);
+  }
+  
   /**
    * Clear the region and, if an RVV is given, return a collection of the
    * version sources in all remaining tags
@@ -505,6 +619,9 @@ abstract class AbstractRegionMap implements RegionMap {
               boolean tombstone = re.isTombstone();
               // note: it.remove() did not reliably remove the entry so we use remove(K,V) here
               if (_getMap().remove(re.getKey(), re)) {
+                if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
+                  GatewaySenderEventImpl.release(re._getValue()); // OFFHEAP _getValue ok
+                }
                 //If this is an overflow only region, we need to free the entry on
                 //disk at this point.
                 try {
@@ -658,26 +775,35 @@ abstract class AbstractRegionMap implements RegionMap {
           .entrySetWithReusableEntries().iterator();
       while (it.hasNext()) {
         Map.Entry<Object, Object> me = it.next();
-        it.remove(); // OFFHEAP: I'm not sure that this sets the value in the ARE to removed. If it doesn't then if it is offheap we need to decrc
+        it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its refcount to an offheap value.
         RegionEntry oldRe = (RegionEntry)me.getValue();
         Object key = me.getKey();
-        Object value = oldRe._getValueUse((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
-        if (value == Token.NOT_AVAILABLE) {
-          // fix for bug 43993
-          value = null;
-        }
-        if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
-          continue;
-        }
-        RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
-        copyRecoveredEntry(oldRe, newRe);
-        if (newRe.isTombstone()) {
-          VersionTag tag = newRe.getVersionStamp().asVersionTag();
-          tombstones.put(tag, newRe);
+        
+        @Retained @Released Object value = oldRe._getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
+
+        try {
+          if (value == Token.NOT_AVAILABLE) {
+            // fix for bug 43993
+            value = null;
+          }
+          if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
+            continue;
+          }
+          RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+          copyRecoveredEntry(oldRe, newRe);
+          // newRe is now in this._getMap().
+          if (newRe.isTombstone()) {
+            VersionTag tag = newRe.getVersionStamp().asVersionTag();
+            tombstones.put(tag, newRe);
+          }
+          _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
+          incEntryCount(1);
+          lruEntryUpdate(newRe);
+        } finally {
+          if (OffHeapHelper.release(value)) {
+            ((OffHeapRegionEntry)oldRe).release();
+          }
         }
-        _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
-        incEntryCount(1);
-        lruEntryUpdate(newRe);
         lruUpdateCallback();
       }
     } else {
@@ -693,15 +819,7 @@ abstract class AbstractRegionMap implements RegionMap {
             tombstones.put(re.getVersionStamp().asVersionTag(), re);
           }
         }
-        if (_getOwner() instanceof BucketRegion) {
-          Object value = re._getValueUse(_getOwner(), false);
-          if (value == Token.NOT_AVAILABLE) {
-            value = null;
-          }
-          if(value != null) {
-            _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateValueSize(value));
-          }
-        }
+        _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
       }
       // Since lru was not being done during recovery call it now.
       lruUpdateCallback();
@@ -729,9 +847,10 @@ abstract class AbstractRegionMap implements RegionMap {
     _getMap().put(newRe.getKey(), newRe);
   }
 
+  @Retained     // Region entry may contain an off-heap value
   public final RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
     boolean needsCallback = false;
-    RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+    @Retained RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
     synchronized (newRe) {
       if (value.getVersionTag()!=null && newRe.getVersionStamp()!=null) {
         newRe.getVersionStamp().setVersions(value.getVersionTag());
@@ -746,9 +865,18 @@ abstract class AbstractRegionMap implements RegionMap {
                 _getOwner().getCachePerfStats().incRetries();
               }
             }
-          } // isRemoved
+          } 
+          /*
+           * Entry already exists which should be impossible.
+           * Free the current entry (if off-heap) and
+           * throw an exception.
+           */
           else {
-            return null;
+            if (newRe instanceof OffHeapRegionEntry) {
+              ((OffHeapRegionEntry) newRe).release();
+            }
+
+            throw new IllegalStateException("Could not recover entry for key " + key + ".  The entry already exists!");
           }
         } // synchronized
       }
@@ -877,6 +1005,10 @@ abstract class AbstractRegionMap implements RegionMap {
       RegionEntry newRe = getEntryFactory().createEntry(owner, key,
           Token.REMOVED_PHASE1);
       EntryEventImpl event = null;
+      
+      @Retained @Released Object oldValue = null;
+      
+      try {
       RegionEntry oldRe = null;
       synchronized (newRe) {
         try {
@@ -923,7 +1055,7 @@ abstract class AbstractRegionMap implements RegionMap {
                 // code will be executed only in case of sqlfabric now. Probably
                 // the code can be made more generic for both SQL Fabric and GemFire.
                 if (indexUpdater != null) {
-                  Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
+                  oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
                   if (oldValue instanceof ListOfDeltas) {
                   // apply the deltas on this new value. update index
                   // Make a new event object
@@ -932,15 +1064,21 @@ abstract class AbstractRegionMap implements RegionMap {
                   if (owner instanceof BucketRegion) {
                     rgn = ((BucketRegion)owner).getPartitionedRegion();
                   }
-                  event = new EntryEventImpl(rgn, Operation.CREATE, key, null,
+                  event = EntryEventImpl.create(rgn, Operation.CREATE, key, null,
                       Boolean.TRUE /* indicate that GII is in progress */,
                       false, null);
+                  try {
                   event.setOldValue(newValue);
                   if (logger.isDebugEnabled()) {
                     logger.debug("initialImagePut: received base value for list of deltas; event: {}", event);
                   }
                   ((ListOfDeltas)oldValue).apply(event);
-                  oldRe.setValue(owner, prepareValueForCache(owner, event.getNewValue()), event);
+                  Object preparedNewValue =oldRe.prepareValueForCache(owner,
+                      event.getNewValueAsOffHeapDeserializedOrRaw(), true);
+                  if(preparedNewValue instanceof Chunk) {
+                    event.setNewValue(preparedNewValue);
+                  }
+                  oldRe.setValue(owner, preparedNewValue, event);
                   //event.setNewValue(event.getOldValue());
                   event.setOldValue(null);
                   try {
@@ -955,11 +1093,17 @@ abstract class AbstractRegionMap implements RegionMap {
                     // this must be done within the oldRe sync block
                     indexUpdater.postEvent(owner, event, oldRe, done);
                   }
+                  } finally {
+                    if (event != null) {
+                      event.release();
+                      event = null;
+                    }
+                  }
                   }
                 }
                 try {
                   if (indexUpdater != null) {
-                    event = new EntryEventImpl(owner, Operation.CREATE, key,
+                    event = EntryEventImpl.create(owner, Operation.CREATE, key,
                         newValue,
                         Boolean.TRUE /* indicate that GII is in progress */,
                         false, null);
@@ -967,7 +1111,7 @@ abstract class AbstractRegionMap implements RegionMap {
                   }
                   result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag);
                   if (result) {
-                    if (oldIsTombstone){
+                    if (oldIsTombstone) {
                       owner.unscheduleTombstone(oldRe);
                       if (newValue != Token.TOMBSTONE){
                         lruEntryCreate(oldRe);
@@ -1004,6 +1148,10 @@ abstract class AbstractRegionMap implements RegionMap {
                   if (indexUpdater != null) {
                     indexUpdater.postEvent(owner, event, oldRe, result);
                   }
+                  if (event != null) {
+                    event.release();
+                    event = null;
+                  }
                 }
               }
             }
@@ -1025,7 +1173,7 @@ abstract class AbstractRegionMap implements RegionMap {
             try {
               if (result) {
                 if (indexUpdater != null) {
-                  event = new EntryEventImpl(owner, Operation.CREATE, key,
+                  event = EntryEventImpl.create(owner, Operation.CREATE, key,
                       newValue,
                       Boolean.TRUE /* indicate that GII is in progress */,
                       false, null);
@@ -1050,6 +1198,10 @@ abstract class AbstractRegionMap implements RegionMap {
               if (result && indexUpdater != null) {
                 indexUpdater.postEvent(owner, event, newRe, done);
               }
+              if (event != null) {
+                event.release();
+                event = null;
+              }
             }
           }
         }
@@ -1065,6 +1217,10 @@ abstract class AbstractRegionMap implements RegionMap {
           }
        } 
       } // synchronized
+      } finally {
+        if (event != null) event.release();
+        OffHeapHelper.release(oldValue);
+      }
     } catch(RegionClearedException rce) {
       //Asif: do not issue any sort of callbacks
       done = false;
@@ -1121,7 +1277,7 @@ abstract class AbstractRegionMap implements RegionMap {
 
     boolean retry = true;
 //    int retries = -1;
-
+    
 RETRY_LOOP:
   while (retry) {
     retry = false;
@@ -1155,9 +1311,17 @@ RETRY_LOOP:
     lockForCacheModification(owner, event);
     try {
 
-    RegionEntry re = getEntry(event.getKey());
+      
+    RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true); 
     RegionEntry tombstone = null;
     boolean haveTombstone = false;
+    /*
+     * Execute the test hook runnable inline (not threaded) if it is not null. 
+     */
+    if(null != testHookRunnableFor48182) {
+      testHookRunnableFor48182.run();
+    }    
+    
     try {
       if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) {
         logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
@@ -1214,12 +1378,13 @@ RETRY_LOOP:
                     }
                   } else {
                     event.setRegionEntry(oldRe);
+                  
                     // Last transaction related eviction check. This should
                     // prevent
                     // transaction conflict (caused by eviction) when the entry
                     // is being added to transaction state.
                     if (isEviction) {
-                      if (!confirmEvictionDestroy(oldRe)) {
+                      if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
                         opCompleted = false;
                         return opCompleted;
                       }
@@ -1276,9 +1441,14 @@ RETRY_LOOP:
                 try { // bug #42228 - leaving "removed" entries in the cache
                 re = newRe;
                 event.setRegionEntry(newRe);
+               
                 try {
                   //if concurrency checks are enabled, destroy will
                   //set the version tag
+				  if (isEviction) {
+                    opCompleted = false;
+                    return opCompleted; 
+                  }
                   opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite, expectedOldValue, true, removeRecoveredEntry);
                   if (opCompleted) {
                     // This is a new entry that was created because we are in
@@ -1321,10 +1491,14 @@ RETRY_LOOP:
                 // Note no need for LRU work since the entry is destroyed
                 // and will be removed when gii completes
                 } finally { // bug #42228
-                  if (!opCompleted && !haveTombstone /* to fix bug 51583 do this for all operations */) {
+                  if (!opCompleted && !haveTombstone  /* to fix bug 51583 do this for all operations */ ) {
+                    
 //                    owner.getLogWriterI18n().warning(LocalizedStrings.DEBUG, "BRUCE: removing incomplete entry");
                     removeEntry(event.getKey(), newRe, false);
                   }
+				  if (!opCompleted && isEviction) {
+                  	removeEntry(event.getKey(), newRe, false);
+                  }
                 }
               } // !opCompleted
             } // synchronized newRe
@@ -1335,7 +1509,7 @@ RETRY_LOOP:
           }
         } // inTokenMode or tombstone creation
         else {
-          if (!isEviction || owner.concurrencyChecksEnabled) {
+          if (!isEviction || owner.concurrencyChecksEnabled) {                                 
             // The following ensures that there is not a concurrent operation
             // on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
             // It fixes bug #32467 by propagating the destroy to the server even though
@@ -1390,9 +1564,11 @@ RETRY_LOOP:
                       doPart3 = true;
                     }
                   }
-                  if (throwex) {
+                  if (throwex) {                    
                     if (ex == null) {
-                      throw new EntryNotFoundException(event.getKey().toString());
+                      // Fix for 48182, check cache state and/or region state before sending entry not found.
+                      // this is from the server and any exceptions will propogate to the client
+                      owner.checkEntryNotFound(event.getKey());
                     } else {
                       throw ex;
                     }
@@ -1481,10 +1657,11 @@ RETRY_LOOP:
                 }
               }
               event.setRegionEntry(re);
+              
               // See comment above about eviction checks
               if (isEviction) {
                 assert expectedOldValue == null;
-                if (!confirmEvictionDestroy(re)) {
+                if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
                   opCompleted = false;
                   return opCompleted;
                 }
@@ -1566,6 +1743,12 @@ RETRY_LOOP:
               }
             } // !isRemoved
             else { // already removed
+              if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
+                // For HDFS region there may be a race with eviction
+                // so retry the operation. fixes bug 49150
+                retry = true;
+                continue RETRY_LOOP;
+              }
               if (re.isTombstone() && event.getVersionTag() != null) {
                 // if we're dealing with a tombstone and this is a remote event
                 // (e.g., from cache client update thread) we need to update
@@ -1585,7 +1768,7 @@ RETRY_LOOP:
               }
 
               if (!inTokenMode && !isEviction) {
-                throw new EntryNotFoundException(event.getKey().toString());
+                owner.checkEntryNotFound(event.getKey());
               }
 //              if (isEviction && re.isTombstone()) {
 //                owner.unscheduleTombstone(re);
@@ -1668,6 +1851,7 @@ RETRY_LOOP:
 
     final boolean isRegionReady = !inTokenMode;
     final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
+    boolean cbEventInPending = false;
     lockForTXCacheModification(owner, versionTag);
     IndexManager oqlIndexManager = owner.getIndexManager() ; 
     try {
@@ -1683,7 +1867,8 @@ RETRY_LOOP:
           synchronized (re) {
             if (!re.isRemoved() || re.isTombstone()) {
               EntryEventImpl sqlfEvent = null;
-              Object oldValue = re.getValueInVM(owner); // OFFHEAP escapes to eei and sqlfEvent
+              @Retained @Released Object oldValue = re.getValueInVM(owner);
+              try {
               final int oldSize = owner.calculateRegionEntryValueSize(re);
               // Create an entry event only if the calling context is
               // a receipt of a TXCommitMessage AND there are callbacks installed
@@ -1691,6 +1876,7 @@ RETRY_LOOP:
               boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI);
               EntryEventImpl cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+              try {
               
               if (/* owner.isUsedForPartitionedRegionBucket() && */ 
                   indexUpdater != null) {
@@ -1753,6 +1939,7 @@ RETRY_LOOP:
                       cbEvent, true/*callDispatchListenerEvent*/);
                 } else {
                   pendingCallbacks.add(cbEvent);
+                  cbEventInPending = true;
                 }
               }
               if (!clearOccured) {
@@ -1761,6 +1948,12 @@ RETRY_LOOP:
               if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent!= null) {
                 txEntryState.setVersionTag(cbEvent.getVersionTag());
               }
+              } finally {
+                if (!cbEventInPending) cbEvent.release();
+              }
+              } finally {
+                OffHeapHelper.release(oldValue);
+              }
             }
           }
         } finally {
@@ -1795,6 +1988,7 @@ RETRY_LOOP:
                     boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
                     cbEvent = createCBEvent(owner, op,
                         key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+                    try {
                     cbEvent.setRegionEntry(oldRe);
                     cbEvent.setOldValue(Token.NOT_AVAILABLE);
                     if (isDebugEnabled) {
@@ -1811,6 +2005,7 @@ RETRY_LOOP:
                             cbEvent, dispatchListenerEvent);
                       } else {
                         pendingCallbacks.add(cbEvent);
+                        cbEventInPending = true;
                       }
                     }
                     int oldSize = 0;
@@ -1829,6 +2024,9 @@ RETRY_LOOP:
                     owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
                         false /* Clear Conflicting with the operation */);
                     lruEntryDestroy(oldRe);
+                    } finally {
+                      if (!cbEventInPending) cbEvent.release();
+                    }
                   }
                   catch (RegionClearedException rce) {
                     owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
@@ -1852,6 +2050,7 @@ RETRY_LOOP:
               boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
               cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+              try {
               cbEvent.setRegionEntry(newRe);
               cbEvent.setOldValue(Token.NOT_AVAILABLE);
               if (isDebugEnabled) {
@@ -1868,6 +2067,7 @@ RETRY_LOOP:
                       cbEvent, dispatchListenerEvent);
                 } else {
                   pendingCallbacks.add(cbEvent);
+                  cbEventInPending = true;
                 }
               }
               EntryLogger.logTXDestroy(_getOwnerObject(), key);
@@ -1885,6 +2085,9 @@ RETRY_LOOP:
                       false /*clearConflict*/);
               // Note no need for LRU work since the entry is destroyed
               // and will be removed when gii completes
+              } finally {
+                if (!cbEventInPending) cbEvent.release();
+              }
             }
             if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
               txEntryState.setVersionTag(cbEvent.getVersionTag());
@@ -1906,6 +2109,7 @@ RETRY_LOOP:
         EntryEventImpl cbEvent = createCBEvent(owner, op,
             key, null, txId, txEvent, eventId, aCallbackArgument, 
             filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+        try {
         if (owner.isUsedForPartitionedRegionBucket()) {
           txHandleWANEvent(owner, cbEvent, txEntryState);
         }
@@ -1914,6 +2118,10 @@ RETRY_LOOP:
           owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,cbEvent,false);
         } else {
           pendingCallbacks.add(cbEvent);
+          cbEventInPending = true;
+        }
+        } finally {
+          if (!cbEventInPending) cbEvent.release();
         }
       }
     } catch( DiskAccessException dae) {
@@ -2003,13 +2211,19 @@ RETRY_LOOP:
                       FilterProfile fp = owner.getFilterProfile();
                       if (!oldRe.isRemoved() && 
                           (fp != null && fp.getCqCount() > 0)) {
-                        Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue
+                        
+                        @Retained @Released Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue
+                        
                         // this will not fault in the value.
+                        try {
                         if (oldValue == Token.NOT_AVAILABLE){
                           event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner));
                         } else {
                           event.setOldValue(oldValue);
                         }
+                        } finally {
+                          OffHeapHelper.release(oldValue);
+                        }
                       }
                       boolean isCreate = false;
                       try {
@@ -2257,7 +2471,14 @@ RETRY_LOOP:
                     if (re.isValueNull()) {
                       event.setOldValue(re.getValueOnDiskOrBuffer(owner));
                     } else {
-                      event.setOldValue(re.getValueInVM(owner)); // OFFHEAP escapes to EntryEventImpl oldValue
+                      
+                      @Retained @Released Object v = re.getValueInVM(owner);
+                      
+                      try {
+                        event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue
+                      } finally {
+                        OffHeapHelper.release(v);
+                      }
                     }
                   }
                   final boolean oldWasTombstone = re.isTombstone();
@@ -2298,7 +2519,7 @@ RETRY_LOOP:
             // is in region, do nothing
           }
           if (!entryExisted) {
-            throw new EntryNotFoundException(event.getKey().toString());
+            owner.checkEntryNotFound(event.getKey());
           }
         } // while(retry)
       } // !forceNewEntry
@@ -2394,7 +2615,7 @@ RETRY_LOOP:
         }
       }
       if (!entryExisted) {
-        throw new EntryNotFoundException(event.getKey().toString());
+        owner.checkEntryNotFound(event.getKey());
       }
     }  catch( DiskAccessException dae) {
       this._getOwner().handleDiskAccessException(dae);
@@ -2449,14 +2670,17 @@ RETRY_LOOP:
                     final boolean oldWasTombstone = oldRe.isTombstone();
                     final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
                     Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP eei
+                    try {
                     // Create an entry event only if the calling context is
                     // a receipt of a TXCommitMessage AND there are callbacks
                     // installed
                     // for this region
                     boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                    boolean cbEventInPending = false;
                     cbEvent = createCBEvent(owner, 
                         localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
                         key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+                    try {
                     cbEvent.setRegionEntry(oldRe);
                     cbEvent.setOldValue(oldValue);
                     if (logger.isDebugEnabled()) {
@@ -2475,7 +2699,7 @@ RETRY_LOOP:
                     processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
                     boolean clearOccured = false;
                     try {
-                      oldRe.setValue(owner, prepareValueForCache(owner, newValue));
+                      oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, true));
                       EntryLogger.logTXInvalidate(_getOwnerObject(), key);
                       owner.updateSizeOnPut(key, oldSize, 0);
                       if (oldWasTombstone) {
@@ -2496,6 +2720,7 @@ RETRY_LOOP:
                             true/*callDispatchListenerEvent*/);
                       } else {
                         pendingCallbacks.add(cbEvent);
+                        cbEventInPending = true;
                       }
                     }
                     if (!clearOccured) {
@@ -2504,21 +2729,29 @@ RETRY_LOOP:
                     if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
                       txEntryState.setVersionTag(cbEvent.getVersionTag());
                     }
+                    } finally {
+                      if (!cbEventInPending) cbEvent.release();
+                    }
+                    } finally {
+                      OffHeapHelper.release(oldValue);
+                    }
                   }
                 }
               }
               if (!opCompleted) {
                 boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate */, owner.isInitialized());
+                boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
                         key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+                try {
                 cbEvent.setRegionEntry(newRe);
                 txRemoveOldIndexEntry(Operation.INVALIDATE, newRe);
                 newRe.setValueResultOfSearch(false);
                 boolean clearOccured = false;
                 try {
                   processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
-                  newRe.setValue(owner, prepareValueForCache(owner, newValue));
+                  newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, true));
                   EntryLogger.logTXInvalidate(_getOwnerObject(), key);
                   owner.updateSizeOnCreate(newRe.getKey(), 0);//we are putting in a new invalidated entry
                 }
@@ -2536,6 +2769,7 @@ RETRY_LOOP:
                         true/*callDispatchListenerEvent*/);
                   } else {
                     pendingCallbacks.add(cbEvent);
+                    cbEventInPending = true;
                   }
                 }
                 opCompleted = true;
@@ -2546,6 +2780,9 @@ RETRY_LOOP:
                 if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
                   txEntryState.setVersionTag(cbEvent.getVersionTag());
                 }
+                } finally {
+                  if (!cbEventInPending) cbEvent.release();
+                }
               }
             }
             finally {
@@ -2568,9 +2805,11 @@ RETRY_LOOP:
                 // installed
                 // for this region
                 boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, 
                         key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+                try {
                 cbEvent.setRegionEntry(re);
                 cbEvent.setOldValue(oldValue);
                 txRemoveOldIndexEntry(Operation.INVALIDATE, re);
@@ -2584,7 +2823,7 @@ RETRY_LOOP:
                 processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
                 boolean clearOccured = false;
                 try {
-                  re.setValue(owner, prepareValueForCache(owner, newValue));
+                  re.setValue(owner, re.prepareValueForCache(owner, newValue, true));
                   EntryLogger.logTXInvalidate(_getOwnerObject(), key);
                   if (wasTombstone) {
                     owner.unscheduleTombstone(re);
@@ -2605,6 +2844,7 @@ RETRY_LOOP:
                         true/*callDispatchListenerEvent*/);
                   } else {
                     pendingCallbacks.add(cbEvent);
+                    cbEventInPending = true;
                   }
                 }
                 if (!clearOccured) {
@@ -2613,6 +2853,9 @@ RETRY_LOOP:
                 if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
                   txEntryState.setVersionTag(cbEvent.getVersionTag());
                 }
+                } finally {
+                  if (!cbEventInPending) cbEvent.release();
+                }
               }
             }
         } else  { //re == null
@@ -2621,16 +2864,22 @@ RETRY_LOOP:
           // that the invalidate is already applied on the Initial image 
           // provider, thus causing region entry to be absent. 
           // Notify clients with client events.
+          boolean cbEventInPending = false;
           cbEvent = createCBEvent(owner, 
               localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, 
                   key, newValue, txId, txEvent, eventId, aCallbackArgument, 
                   filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
+          try {
           switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
           if (pendingCallbacks == null) {
             owner.invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE,
                 cbEvent, false);
           } else {
             pendingCallbacks.add(cbEvent);
+            cbEventInPending = true;
+          }
+          } finally {
+            if (!cbEventInPending) cbEvent.release();
           }
         }
       }
@@ -2669,10 +2918,19 @@ RETRY_LOOP:
     }
   }
 
-  private RegionEntry getOrCreateRegionEntry(Object ownerRegion, Object key, Object value, boolean onlyExisting) {
-    RegionEntry retVal = getEntry(key);
+  private RegionEntry getOrCreateRegionEntry(Object ownerRegion,
+      EntryEventImpl event, Object value,
+      MapCallbackAdapter<Object, Object, Object, Object> valueCreator,
+      boolean onlyExisting, boolean returnTombstone) {
+    Object key = event.getKey();
+    RegionEntry retVal = null;
+    if (event.isFetchFromHDFS()) {
+      retVal = getEntry(event);
+    } else {
+      retVal = getEntryInVM(key);
+    }
     if (onlyExisting) {
-      if (retVal != null && retVal.isTombstone()) {
+      if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
         return null;
       }
       return retVal;
@@ -2680,14 +2938,30 @@ RETRY_LOOP:
     if (retVal != null) {
       return retVal;
     }
+    if (valueCreator != null) {
+      value = valueCreator.newValue(key, ownerRegion, value, null);
+    }
     retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value);
     RegionEntry oldRe = putEntryIfAbsent(key, retVal);
     if (oldRe != null) {
+      if (retVal instanceof OffHeapRegionEntry) {
+        ((OffHeapRegionEntry) retVal).release();
+      }
       return oldRe;
     }
     return retVal;
   }
 
+  protected static final MapCallbackAdapter<Object, Object, Object, Object>
+      listOfDeltasCreator = new MapCallbackAdapter<Object, Object,
+          Object, Object>() {
+    @Override
+    public Object newValue(Object key, Object context, Object createParams,
+        final MapResult result) {
+      return new ListOfDeltas(4);
+    }
+  };
+  
   /**
    * Neeraj: The below if block is to handle the special
    * scenario witnessed in Sqlfabric for now. (Though its
@@ -2708,28 +2982,37 @@ RETRY_LOOP:
       boolean isOldValueDelta = true;
       try {
         if (ifOld) {
-          final Delta delta = (Delta)event.getRawNewValue();
-          RegionEntry re = getOrCreateRegionEntry(owner, event.getKey(), new ListOfDeltas(delta), false);
+          final Delta delta = event.getDeltaNewValue();
+		  RegionEntry re = getOrCreateRegionEntry(owner, event, null,
+          	  listOfDeltasCreator, false, false);
           assert re != null;
           synchronized (re) {
-            Object oVal = re.getValueInVMOrDiskWithoutFaultIn(owner);
+            @Retained @Released Object oVal = re.getValueOffHeapOrDiskWithoutFaultIn(owner);
             if (oVal != null) {
+              try {
               if (oVal instanceof ListOfDeltas) {
                 if (logger.isDebugEnabled()) {
                   logger.debug("basicPut: adding delta to list of deltas: {}", delta);
                 }
                 ((ListOfDeltas)oVal).merge(delta);
+                @Retained Object newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, oVal, true);              
+                re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan
               }
               else {
                 isOldValueDelta = false;
               }
+              }finally {
+                OffHeapHelper.release(oVal);
+              }
             }
             else {
               if (logger.isDebugEnabled()) {
                 logger.debug("basicPut: new list of deltas with delta: {}", delta);
               }
-              Object newVal = new ListOfDeltas(delta);
-              re.setValue(owner, newVal); // TODO no need to call AbstractRegionMap.prepareValueForCache here?
+              @Retained Object newVal = new ListOfDeltas(delta);
+              // TODO no need to call AbstractRegionMap.prepareValueForCache here?
+              newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, newVal, true);
+              re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan
             }
           }
         }
@@ -2812,7 +3095,6 @@ RETRY_LOOP:
 
     boolean retrieveOldValueForDelta = event.getDeltaBytes() != null
         && event.getRawNewValue() == null;
-    Object oldValueForDelta = null;
     lockForCacheModification(owner, event);
     IndexManager oqlIndexManager = null;
     try {
@@ -2842,7 +3124,8 @@ RETRY_LOOP:
         RegionEntry re = null;
         boolean eventRecorded = false;
         boolean onlyExisting = ifOld && !replaceOnClient;
-        re = getOrCreateRegionEntry(owner, event.getKey(), Token.REMOVED_PHASE1, onlyExisting);
+		re = getOrCreateRegionEntry(owner, event, 
+		    Token.REMOVED_PHASE1, null, onlyExisting, false);
         if (re == null) {
           throwExceptionForSqlFire(event);
           return null;
@@ -2853,8 +3136,9 @@ RETRY_LOOP:
             // from the map. otherwise we can append an event to it
             // and change its state
             if (re.isRemovedPhase2()) {
-              re = getOrCreateRegionEntry(owner, event.getKey(), Token.REMOVED_PHASE1, onlyExisting);
-              _getOwner().getCachePerfStats().incRetries();
+                re = getOrCreateRegionEntry(owner, event,
+                    Token.REMOVED_PHASE1, null, onlyExisting, false);
+                _getOwner().getCachePerfStats().incRetries();
               if (re == null) {
                 // this will happen when onlyExisting is true
                 throwExceptionForSqlFire(event);
@@ -2862,6 +3146,7 @@ RETRY_LOOP:
               }
               continue;
             } else {
+              @Released Object oldValueForDelta = null;
               if (retrieveOldValueForDelta) {
                 // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
                 final boolean disabled = disableLruUpdateCallback();
@@ -2897,47 +3182,92 @@ RETRY_LOOP:
 
                 // notify index of an update
                 notifyIndex(re, true);
-                try {
-
                   try {
-                    if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set
-                        || !re.isRemoved()
-                        || replaceOnClient) {
-                      // update
-                      updateEntry(event, requireOldValue, oldValueForDelta, re);
-                    } else {
-                      // create
-                      createEntry(event, owner, re);
+                    try {
+                      if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set
+                          || !re.isRemoved()
+                          || replaceOnClient) {
+                        // update
+                        updateEntry(event, requireOldValue, oldValueForDelta, re);
+                      } else {
+                        // create
+                        createEntry(event, owner, re);
+                      }
+                      owner.recordEvent(event);
+                      eventRecorded = true;
+                    } catch (RegionClearedException rce) {
+                      clearOccured = true;
+                      owner.recordEvent(event);
+                    } catch (ConcurrentCacheModificationException ccme) {
+                      VersionTag tag = event.getVersionTag();
+                      if (tag != null && tag.isTimeStampUpdated()) {
+                        // Notify gateways of new time-stamp.
+                        owner.notifyTimestampsToGateways(event);
+                      }
+                      throw ccme;
                     }
-                    owner.recordEvent(event);
-                    eventRecorded = true;
-                  } catch (RegionClearedException rce) {
-                    clearOccured = true;
-                    owner.recordEvent(event);
-                  } catch (ConcurrentCacheModificationException ccme) {
-                    VersionTag tag = event.getVersionTag();
-                    if (tag != null && tag.isTimeStampUpdated()) {
-                      // Notify gateways of new time-stamp.
-                      owner.notifyTimestampsToGateways(event);
+                    if (uninitialized) {
+                      event.inhibitCacheListenerNotification(true);
                     }
-                    throw ccme;
+                    updateLru(clearOccured, re, event);
+
+                    lastModifiedTime = owner.basicPutPart2(event, re,
+                        !uninitialized, lastModifiedTime, clearOccured);
+                  } finally {
+                    notifyIndex(re, false);
                   }
-                  if (uninitialized) {
-                    event.inhibitCacheListenerNotification(true);
+                  result = re;
+                  break;
+                } finally {
+                  OffHeapHelper.release(oldValueForDelta);
+                  if (re != null && !onlyExisting && !isOpComplete(re, event)) {
+                    owner.cleanUpOnIncompleteOp(event, re, eventRecorded,
+                        false/* updateStats */, replaceOnClient);
                   }
-                  updateLru(clearOccured, re, event);
+                  else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
+                  BucketRegion br = (BucketRegion)owner;
+                  CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
+                  long startTime= stats.startCustomEviction();
+                  CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+                  // No need to update indexes if entry was faulted in but operation did not succeed. 
+                  if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) {
+                    
+                    if (csAttr.getCriteria().doEvict(event)) {
+                      stats.incEvictionsInProgress();
+                      // set the flag on event saying the entry should be evicted 
+                      // and not indexed
+                      EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
+                          null/* newValue */, null, false, owner.getMyId());
+                      try {
 
-                  lastModifiedTime = owner.basicPutPart2(event, re,
-                      !uninitialized, lastModifiedTime, clearOccured);
-                } finally {
-                  notifyIndex(re, false);
-                }
-                result = re;
-                break;
-              } finally {
-                if (re != null && !isOpComplete(re, event)) {
-                  owner.cleanUpOnIncompleteOp(event, re, eventRecorded,
-                      false/* updateStats */, replaceOnClient);
+                      destroyEvent.setOldValueFromRegion();
+                      destroyEvent.setCustomEviction(true);
+                      destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate());
+                      if(logger.isDebugEnabled()) {
+                        logger.debug("Evicting the entry " + destroyEvent);
+                      }
+                      if(result != null) {
+                        removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater);
+                      }
+                      else{
+                        removeEntry(event.getKey(),re, true, destroyEvent,owner, null);
+                      }
+                      //mark the region entry for this event as evicted 
+                      event.setEvicted();
+                      stats.incEvictions();
+                      if(logger.isDebugEnabled()) {
+                        logger.debug("Evicted the entry " + destroyEvent);
+                      }
+                      //removeEntry(event.getKey(), re);
+                      } finally {
+                        destroyEvent.release();
+                        stats.decEvictionsInProgress();
+                      }
+                    } else {
+                      re.clearMarkedForEviction();
+                    }
+                  }
+                  stats.endCustomEviction(startTime);
                 }
               } // try
             }
@@ -3018,10 +3348,18 @@ RETRY_LOOP:
     // replace is propagated to server, so no need to check
     // satisfiesOldValue on client
     if (expectedOldValue != null && !replaceOnClient) {
-      Object v = re._getValueUse(event.getLocalRegion(), true);
-        if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v)) {
+      SimpleMemoryAllocatorImpl.skipRefCountTracking();
+      
+      @Retained @Released Object v = re._getValueRetain(event.getLocalRegion(), true);
+      
+      SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+      try {
+        if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getLocalRegion())) {
           return false;
         }
+      } finally {
+        OffHeapHelper.releaseWithNoTracking(v);
+      }
     }
     return true;
   }
@@ -3038,15 +3376,43 @@ RETRY_LOOP:
   // calculations will be incorrect in case the value was read from
   // disk but not brought into the VM like what getValueInVMOrDisk
   // method does when value is not found in VM
+  // PRECONDITION: caller must be synced on re
   private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite, boolean requireOldValue) {
-    if (getIndexUpdater() != null || cacheWrite || requireOldValue ||
-        event.getOperation().guaranteesOldValue()) {
-      if (event.hasDelta() || event.getOperation().guaranteesOldValue()) {
-        Object oldValueInVMOrDisk = re.getValueOffHeapOrDiskWithoutFaultIn(event.getLocalRegion());
-          event.setOldValue(oldValueInVMOrDisk, requireOldValue);
+    boolean needToSetOldValue = getIndexUpdater() != null || cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue();
+    if (needToSetOldValue) {
+      if (event.hasDelta() || event.getOperation().guaranteesOldValue()
+          || GemFireCacheImpl.sqlfSystem()) {
+        // In these cases we want to even get the old value from disk if it is not in memory
+        SimpleMemoryAllocatorImpl.skipRefCountTracking();
+        @Released Object oldValueInVMOrDisk = re.getValueOffHeapOrDiskWithoutFaultIn(event.getLocalRegion());
+        SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+        try {
+          event.setOldValue(oldValueInVMOrDisk, requireOldValue
+              || GemFireCacheImpl.sqlfSystem());
+        } finally {
+          OffHeapHelper.releaseWithNoTracking(oldValueInVMOrDisk);
+        }
       } else {
-        Object oldValueInVM = re._getValueUse(event.getLocalRegion(), true); // OFFHEAP: re synced so can use its ref.
-          event.setOldValue(oldValueInVM, requireOldValue);
+        // In these cases only need the old value if it is in memory
+        SimpleMemoryAllocatorImpl.skipRefCountTracking();
+        
+        @Retained @Released Object oldValueInVM = re._getValueRetain(event.getLocalRegion(), true); // OFFHEAP: re synced so can use its ref.
+        
+        SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+        try {
+          event.setOldValue(oldValueInVM,
+              requireOldValue || GemFireCacheImpl.sqlfSystem());
+        } finally {
+          OffHeapHelper.releaseWithNoTracking(oldValueInVM);
+        }
+      }
+    } else {
+      // if the old value is in memory then if it is a GatewaySenderEventImpl then
+      // we want to set the old value.
+      @Unretained Object ov = re._getValue(); // OFFHEAP _getValue is ok since re is synced and we only use it if its a GatewaySenderEventImpl.
+      // Since GatewaySenderEventImpl is never stored in an off-heap region nor a compressed region we don't need to worry about ov being compressed.
+      if (ov instanceof GatewaySenderEventImpl) {
+        event.setOldValue(ov, true);
       }
     }
   }
@@ -3178,7 +3544,7 @@ RETRY_LOOP:
   }
 
   protected boolean destroyEntry(RegionEntry re, EntryEventImpl event,
-      boolean inTokenMode, boolean cacheWrite, Object expectedOldValue,
+      boolean inTokenMode, boolean cacheWrite, @Released Object expectedOldValue,
       boolean forceDestroy, boolean removeRecoveredEntry)
       throws CacheWriterException, TimeoutException, EntryNotFoundException,
       RegionClearedException {
@@ -3216,8 +3582,10 @@ RETRY_LOOP:
     EntryEventImpl cbEvent = null;
     EntryEventImpl sqlfEvent = null;
     boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady);
+    boolean cbEventInPending = false;
     cbEvent = createCBEvent(owner, putOp, key, newValue, txId, 
         txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
+    try {
     if (logger.isDebugEnabled()) {
       logger.debug("txApplyPut cbEvent={}", cbEvent);
     }
@@ -3293,7 +3661,7 @@ RETRY_LOOP:
                         //cbEvent.putExistingEntry(owner, re);
                         sqlfEvent.putExistingEntry(owner, re);
                       } else {
-                        re.setValue(owner, prepareValueForCache(owner, newValue, cbEvent));
+                        re.setValue(owner, re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
                       }
                       if (putOp.isCreate()) {
                         owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
@@ -3332,6 +3700,7 @@ RETRY_LOOP:
                           cbEvent, hasRemoteOrigin);
                     } else {
                       pendingCallbacks.add(cbEvent);
+                      cbEventInPending = true;
                     }
                   }
                   if (!clearOccured) {
@@ -3400,7 +3769,7 @@ RETRY_LOOP:
                         //cbEvent.putExistingEntry(owner, oldRe);
                         sqlfEvent.putExistingEntry(owner, oldRe);
                       } else {
-                        oldRe.setValue(owner, prepareValueForCache(owner, newValue, cbEvent));
+                        oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
                         if (wasTombstone) {
                           owner.unscheduleTombstone(oldRe);
                         }
@@ -3444,6 +3813,7 @@ RETRY_LOOP:
                           cbEvent, true/*callDispatchListenerEvent*/);
                     } else {
                       pendingCallbacks.add(cbEvent);
+                      cbEventInPending = true;
                     }
                   }
                   if (!clearOccured) {
@@ -3479,7 +3849,7 @@ RETRY_LOOP:
                   if (sqlfEvent != null ) {
                     sqlfEvent.putNewEntry(owner,newRe);
                   } else {
-                    newRe.setValue(owner, prepareValueForCache(owner, newValue, cbEvent));
+                    newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
                   }
                   owner.updateSizeOnCreate(newRe.getKey(), owner.calculateRegionEntryValueSize(newRe));
                 }
@@ -3508,6 +3878,7 @@ RETRY_LOOP:
                       true/*callDispatchListenerEvent*/);
                 } else {
                   pendingCallbacks.add(cbEvent);
+                  cbEventInPending = true;
                 }
               }
               if (!clearOccured) {
@@ -3535,6 +3906,10 @@ RETRY_LOOP:
         oqlIndexManager.countDownIndexUpdaters();
       }
     }
+    } finally {
+      if (!cbEventInPending) cbEvent.release();
+      if (sqlfEvent != null) sqlfEvent.release();
+    }
   }
 
   private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl cbEvent, TXEntryState txEntryState) {
@@ -3564,6 +3939,15 @@ RETRY_LOOP:
       } catch (ConcurrentCacheModificationException ignore) {
         // ignore this execption, however invoke callbacks for this operation
       }
+
+      // For distributed transactions, stuff the next region version generated
+      // in phase-1 commit into the cbEvent so that ARE.generateVersionTag can later
+      // just apply it and not regenerate it in phase-2 commit
+      if (cbEvent != null && txEntryState != null && txEntryState.getDistTxEntryStates() != null) {
+        cbEvent.setNextRegionVersion(txEntryState.getDistTxEntryStates().getRegionVersion());  
+      }
+      
+      //cbEvent.setNextRegionVersion(txEntryState.getNextRegionVersion());
       owner.generateAndSetVersionTag(cbEvent, re);
     }
   }
@@ -3588,47 +3972,6 @@ RETRY_LOOP:
     return event;
   }
 
-  public static Object prepareValueForCache(RegionEntryContext r, Object val) {
-    return prepareValueForCache(r, val, null);
-  }
-  /**
-   * Prepares and returns a value to be stored in the cache.
-   * Current prep is to make sure a PdxInstance is not stored in the cache
-   * and to copy values into offheap memory of the region is using off heap storage.
-   * 
-   * @param r the region the prepared object will be stored in
-   * @param val the value that will be stored
-   * @return the prepared value
-   */
-  public static Object prepareValueForCache(RegionEntryContext r, Object val, EntryEventImpl event) {
-    Object nv = val;
-    if (nv instanceof PdxInstance) {
-      // We do not want to put PDXs in the cache as values.
-      // So get the serialized bytes and use a CachedDeserializable.
-      try {
-        byte[] data = ((ConvertableToBytes)nv).toBytes();
-        byte[] compressedData = compressBytes(r, data);
-        if (data == compressedData) {
-          nv = CachedDeserializableFactory.create(data);
-        } else {
-          nv = compressedData;
-        }
-      } catch (IOException e) {
-        throw new PdxSerializationException("Could not convert " + nv + " to bytes", e);
-      }
-    } else {
-      nv = AbstractRegionEntry.compress(r, nv, event);
-    }
-    return nv;
-  }
-  
-  private static byte[] compressBytes(RegionEntryContext context, byte[] value) {
-    if (AbstractRegionEntry.isCompressible(context, value)) {
-      value = context.getCompressor().compress(value);
-    }
-    return value;
-  }
-  
   /**
    * Removing the existing indexed value requires the current value in the cache, 
    * that is the one prior to applying the operation.
@@ -3702,10 +4045,13 @@ RETRY_LOOP:
       eventRegion = re.getPartitionedRegion();
     }
     
-    EntryEventImpl retVal = new EntryEventImpl(
+    EntryEventImpl retVal = EntryEventImpl.create(
         re, op, key, newValue,
         aCallbackArgument,
         txEntryState == null, originator);
+    boolean returnedRetVal = false;
+    try {
+ 
     
     if(bridgeContext!=null) {
       retVal.setContext(bridgeContext);
@@ -3778,7 +4124,13 @@ RETRY_LOOP:
       }
     }    
     retVal.setTransactionId(txId);
+    returnedRetVal = true;
     return retVal;
+    } finally {
+      if (!returnedRetVal) {
+        retVal.release();
+      }
+    }
   }
 
   public final void writeSyncIfPresent(Object key, Runnable runner)
@@ -3892,7 +4244,14 @@ RETRY_LOOP:
     if (actualRe != re) {  // null actualRe is okay here
       return true; // tombstone was evicted at some point
     }
-    int entryVersion = re.getVersionStamp().getEntryVersion();
+    VersionStamp vs = re.getVersionStamp();
+    if (vs == null) {
+      // if we have no VersionStamp why were we even added as a tombstone?
+      // We used to see an NPE here. See bug 52092.
+      logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
+      return true;
+    }
+    int entryVersion = vs.getEntryVersion();
     boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
     return !isSameTombstone;
   }
@@ -3903,8 +4262,8 @@ RETRY_LOOP:
     int destroyedVersion = version.getEntryVersion();
     DiskRegion dr = this._getOwner().getDiskRegion();
 
+    synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
         synchronized (re) {
-         synchronized(this._getOwner().getSizeGuard()) {
           int entryVersion = re.getVersionStamp().getEntryVersion();
           boolean isTombstone = re.isTombstone();
           boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java
index 1b2a8c1..2a88268 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java
@@ -322,5 +322,10 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
         throw ex;
       }
     }
+    
+    @Override
+    protected boolean mayAddToMultipleSerialGateways(DistributionManager dm) {
+      return _mayAddToMultipleSerialGateways(dm);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 40c8808..1299d75 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -882,7 +882,24 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
    */
   private final InternalDistributedMember getExistingPrimary() {
     return basicGetPrimaryMember();
-  } 
+  }
+  
+  /**
+   * If the current member is primary for this bucket return true, otherwise, 
+   * give some time for the current member to become primary and
+   * then return whether it is a primary (true/false).
+   */
+  public final boolean isPrimaryWithWait() {
+    if (this.isPrimary()) {
+      return true;
+    }
+    // wait for the current member to become primary holder
+    InternalDistributedMember primary = waitForNewPrimary(); 
+    if(primary != null) {
+        return true;
+    }
+    return false;
+  }
 
   /** 
    * This method was split out from getPrimary() due to bug #40639
@@ -976,6 +993,11 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
       }
     } finally {
       if (lostPrimary) {
+        Bucket br = this.regionAdvisor.getBucket(getBucket().getId());
+        if( br != null && br instanceof BucketRegion) {
+          ((BucketRegion)br).beforeReleasingPrimaryLockDuringDemotion();
+        }
+
         releasePrimaryLock();
         // this was a deposePrimary call so we need to depose children as well
         deposePrimaryForColocatedChildren();
@@ -1582,7 +1604,39 @@ public final class BucketAdvisor extends CacheDistributionAdvisor  {
       return false;
     }
   }
+
+  private final static long BUCKET_STORAGE_WAIT = Long.getLong("gemfire.BUCKET_STORAGE_WAIT", 15000).longValue(); // 15 seconds
   
+  public boolean waitForStorage() {
+    synchronized (this) {
+      // let's park this thread and wait for storage!
+      StopWatch timer = new StopWatch(true);
+      try {
+        for (;;) {
+          if (this.regionAdvisor.isBucketLocal(getBucket().getId())) {
+            return true;
+          }
+          getProxyBucketRegion().getPartitionedRegion().checkReadiness();
+          if (isClosed()) {
+            return false;
+          }
+          long timeLeft = BUCKET_STORAGE_WAIT - timer.elapsedTimeMillis();
+          if (timeLeft <= 0) {
+            return false;
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Waiting for bucket storage" + this);
+          }
+          this.wait(timeLeft); // spurious wakeup ok
+        }
+      }
+      catch (InterruptedException e) {
+        // abort and return null
+        Thread.currentThread().interrupt();
+      }
+      return false;
+    }
+  }
   public void clearPrimaryElector() {
     synchronized(this) {
       primaryElector = null;


Mime
View raw message