geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [22/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index c870544..0e43c25 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -8,6 +8,8 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -50,6 +52,7 @@ import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.DeltaSerializationException;
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.LogWriter;
 import com.gemstone.gemfire.SystemFailure;
 import com.gemstone.gemfire.admin.internal.SystemMemberCacheEventProcessor;
 import com.gemstone.gemfire.cache.AttributesMutator;
@@ -105,6 +108,11 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
 import com.gemstone.gemfire.cache.control.ResourceManager;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
 import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
@@ -150,7 +158,9 @@ import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
 import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
 import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
 import com.gemstone.gemfire.internal.cache.control.ResourceListener;
 import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
 import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
@@ -186,6 +196,13 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
 import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
@@ -440,10 +457,14 @@ public class LocalRegion extends AbstractRegion
    * This boolean is true when a member who has this region is running low on memory.
    * It is used to reject region operations.
    */
-  public final AtomicBoolean heapThresholdReached = new AtomicBoolean(false);
+  public final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
 
   // Lock for updating PR MetaData on client side 
   public final Lock clientMetaDataLock = new ReentrantLock();
+  
+  
+  protected HdfsRegionManager hdfsManager;
+  protected HoplogListenerForRegion hoplogListener;
 
   /**
    * There seem to be cases where a region can be created and yet the
@@ -548,7 +569,7 @@ public class LocalRegion extends AbstractRegion
 
   ////////////////// Public Methods ///////////////////////////////////////////
 
-  static private String calcFullPath(String regionName, LocalRegion parentRegion) {
+  static String calcFullPath(String regionName, LocalRegion parentRegion) {
     StringBuilder buf = null;
     if (parentRegion == null) {
       buf = new StringBuilder(regionName.length() + 1);
@@ -579,6 +600,12 @@ public class LocalRegion extends AbstractRegion
     if (internalRegionArgs.getPartitionedRegion() != null) {
       myName = internalRegionArgs.getPartitionedRegion().getFullPath();
     }
+    this.offHeap = attrs.getOffHeap() || Boolean.getBoolean(myName+":OFF_HEAP");
+    if (getOffHeap()) {
+      if (cache.getOffHeapStore() == null) {
+        throw new IllegalStateException("The region " + myName + " was configured to use off heap memory but no off heap memory was configured.");
+      }
+    }
     
     this.initializationLatchBeforeGetInitialImage = new StoppableCountDownLatch(this.stopper, 1);
     this.initializationLatchAfterGetInitialImage = new StoppableCountDownLatch(this.stopper, 1);
@@ -608,6 +635,7 @@ public class LocalRegion extends AbstractRegion
       }
     }
 
+    this.hdfsManager = initHDFSManager();
     this.dsi = findDiskStore(attrs, internalRegionArgs);
     this.diskRegion = createDiskRegion(internalRegionArgs);
     this.entries = createRegionMap(internalRegionArgs);
@@ -661,12 +689,25 @@ public class LocalRegion extends AbstractRegion
         && !isUsedForMetaRegion() || isMetaRegionWithTransactions();
 
     this.testCallable = internalRegionArgs.getTestCallable();
-    // Create Listener only when Heap eviction is enabled, and BucketRegion is
-    // created
+    
+  }
+
+  private HdfsRegionManager initHDFSManager() {
+    HdfsRegionManager hdfsMgr = null;
+    if (this.getHDFSStoreName() != null) {
+      this.hoplogListener = new HoplogListenerForRegion();
+      HDFSRegionDirector.getInstance().setCache(cache);
+      hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this, 
+          this.getHDFSStoreName(), hoplogListener);
+    }
+    return hdfsMgr;
   }
 
   private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) {
     RegionMap result = null;
+	if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) {
+      this.diskRegion.setEntriesMapIncompatible(true);
+    }
     if (this.diskRegion != null) {
       result = this.diskRegion.useExistingRegionMap(this);
     }
@@ -920,6 +961,7 @@ public class LocalRegion extends AbstractRegion
         validateRegionName(subregionName);
 
         validateSubregionAttributes(regionAttributes);
+        String regionPath = calcFullPath(subregionName, this);
 
         // lock down the subregionsLock
         // to prevent other threads from adding a region to it in toRegion
@@ -930,6 +972,11 @@ public class LocalRegion extends AbstractRegion
           existing = (LocalRegion)this.subregions.get(subregionName);
 
           if (existing == null) {
+            // create the async queue for HDFS if required. 
+            HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath,
+                regionAttributes, this.cache);
+            regionAttributes = cache.setEvictionAttributesForLargeRegion(
+                regionAttributes);
             if (regionAttributes.getScope().isDistributed()
                 && internalRegionArgs.isUsedForPartitionedRegionBucket()) {
               final PartitionedRegion pr = internalRegionArgs
@@ -938,10 +985,17 @@ public class LocalRegion extends AbstractRegion
               internalRegionArgs.setUserAttribute(pr.getUserAttribute());
               internalRegionArgs.setKeyRequiresRegionContext(pr
                   .keyRequiresRegionContext());
-              if(pr.isShadowPR()) {
-                newRegion = new BucketRegionQueue(subregionName, regionAttributes,
-                    this, this.cache, internalRegionArgs);
-              }else {
+              if (pr.isShadowPR()) {
+                if (!pr.isShadowPRForHDFS()) {
+                    newRegion = new BucketRegionQueue(subregionName, regionAttributes,
+                      this, this.cache, internalRegionArgs);
+                }
+                else {
+                   newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes,
+                      this, this.cache, internalRegionArgs);
+                }
+                
+              } else {
                 newRegion = new BucketRegion(subregionName, regionAttributes,
                     this, this.cache, internalRegionArgs);  
               }
@@ -1000,10 +1054,19 @@ public class LocalRegion extends AbstractRegion
             (UserSpecifiedRegionAttributes)regionAttributes).getIndexes());  
         }
         newRegion.initialize(snapshotInputStream, imageTarget, internalRegionArgs); // releases initialization Latches
-        //register the region with resource manager to get heap events
+        //register the region with resource manager to get memory events
         if(!newRegion.isInternalRegion()){
           if (!newRegion.isDestroyed) {
-            cache.getResourceManager().addResourceListener(newRegion);
+            cache.getResourceManager().addResourceListener(ResourceType.MEMORY, newRegion);
+            
+            if (!newRegion.getOffHeap()) {
+              newRegion.initialCriticalMembers(cache.getResourceManager().getHeapMonitor().getState().isCritical(), cache
+                  .getResourceAdvisor().adviseCritialMembers());
+            } else {
+              newRegion.initialCriticalMembers(cache.getResourceManager().getHeapMonitor().getState().isCritical()
+                  || cache.getResourceManager().getOffHeapMonitor().getState().isCritical(), cache.getResourceAdvisor()
+                  .adviseCritialMembers());
+            }
 
             // synchronization would be done on ManagementAdapter.regionOpLock
             // instead of destroyLock in LocalRegion? ManagementAdapter is one
@@ -1056,33 +1119,40 @@ public class LocalRegion extends AbstractRegion
     long startPut = CachePerfStats.getStatTime();
     EntryEventImpl event = newCreateEntryEvent(key, value, aCallbackArgument);
     validatedCreate(event, startPut);
+    // TODO OFFHEAP: validatedCreate calls freeOffHeapResources
   }
 
   public final void validatedCreate(EntryEventImpl event, long startPut)
       throws TimeoutException, EntryExistsException, CacheWriterException {
 
-    if (event.getEventId() == null && generateEventID()) {
-      event.setNewEventId(cache.getDistributedSystem());
-    }
-    //Fix for 42448 - Only make create with null a local invalidate for
-    //normal regions. Otherwise, it will become a distributed invalidate.
-    if(getDataPolicy() == DataPolicy.NORMAL) {
-      event.setLocalInvalid(true);
-    }
-    discoverJTA();
-    if (!basicPut(event,
-                  true,  // ifNew
-                  false, // ifOld
-                  null,  // expectedOldValue
-                  true // requireOldValue TODO txMerge why is oldValue required for create? I think so that the EntryExistsException will have it.
-                  )) {
-      throw new EntryExistsException(event.getKey().toString(),
-          event.getOldValue());
-    }
-    else {
-      if (! getDataView().isDeferredStats()) {
-        getCachePerfStats().endPut(startPut, false);
+    try {
+      if (event.getEventId() == null && generateEventID()) {
+        event.setNewEventId(cache.getDistributedSystem());
+      }
+      assert event.isFetchFromHDFS() : "validatedPut() should have been called";
+      // Fix for 42448 - Only make create with null a local invalidate for
+      // normal regions. Otherwise, it will become a distributed invalidate.
+      if (getDataPolicy() == DataPolicy.NORMAL) {
+        event.setLocalInvalid(true);
+      }
+      discoverJTA();
+      if (!basicPut(event, true, // ifNew
+          false, // ifOld
+          null, // expectedOldValue
+          true // requireOldValue TODO txMerge why is oldValue required for
+               // create? I think so that the EntryExistsException will have it.
+      )) {
+        throw new EntryExistsException(event.getKey().toString(),
+            event.getOldValue());
+      } else {
+        if (!getDataView().isDeferredStats()) {
+          getCachePerfStats().endPut(startPut, false);
+        }
       }
+    } finally {
+
+      event.release();
+
     }
   }
 
@@ -1095,7 +1165,7 @@ public class LocalRegion extends AbstractRegion
     checkReadiness();
     checkForLimitedOrNoAccess();
 
-    return new EntryEventImpl(this, Operation.CREATE, key,
+    return EntryEventImpl.create(this, Operation.CREATE, key,
         value, aCallbackArgument, false, getMyId())
         /* to distinguish genuine create */.setCreate(true);
   }
@@ -1120,7 +1190,8 @@ public class LocalRegion extends AbstractRegion
   public final Object destroy(Object key, Object aCallbackArgument)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
     EntryEventImpl event = newDestroyEntryEvent(key, aCallbackArgument);
-      return validatedDestroy(key, event);
+    return validatedDestroy(key, event);
+    // TODO OFFHEAP: validatedDestroy calls freeOffHeapResources
   }
 
   /**
@@ -1129,14 +1200,21 @@ public class LocalRegion extends AbstractRegion
    */
   public Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException
-  {
-    if (event.getEventId() == null && generateEventID()) {
-      event.setNewEventId(cache.getDistributedSystem());
+ {
+    try {
+      if (event.getEventId() == null && generateEventID()) {
+        event.setNewEventId(cache.getDistributedSystem());
+      }
+      basicDestroy(event, true, // cacheWrite
+          null); // expectedOldValue
+      if (event.isOldValueOffHeap()) {
+        return null;
+      } else {
+        return handleNotAvailable(event.getOldValue());
+      }
+    } finally {
+      event.release();
     }
-    basicDestroy(event,
-                 true,  // cacheWrite
-                 null); // expectedOldValue
-    return handleNotAvailable(event.getOldValue());
   }
 
   // split into a separate newDestroyEntryEvent since SQLFabric may need to
@@ -1148,7 +1226,7 @@ public class LocalRegion extends AbstractRegion
     checkReadiness();
     checkForLimitedOrNoAccess();
 
-    return new EntryEventImpl(this, Operation.DESTROY, key,
+    return EntryEventImpl.create(this, Operation.DESTROY, key,
         null/* newValue */, aCallbackArgument, false, getMyId());
   }
 
@@ -1179,14 +1257,22 @@ public class LocalRegion extends AbstractRegion
    * @param preferCD true if the preferred result form is CachedDeserializable
    * @param clientEvent client's event, if any (for version tag retrieval)
    * @param returnTombstones TODO
+   * @param retainResult if true then the result may be a retained off-heap reference
    * @return the value for the given key
    */
-  public final Object getDeserializedValue(final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
+  public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead, 
+  boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
     if (this.diskRegion != null) {
       this.diskRegion.setClearCountReference();
     }
     try {
-      RegionEntry re = this.entries.getEntry(keyInfo.getKey());
+      if (re == null) {
+        if (allowReadFromHDFS) {
+          re = this.entries.getEntry(keyInfo.getKey());
+        } else {
+          re = this.entries.getOperationalEntryInVM(keyInfo.getKey());
+        }
+      }
       //skip updating the stats if the value is null
       // TODO - We need to clean up the callers of the this class so that we can
       // update the statistics here, where it would make more sense.
@@ -1200,7 +1286,7 @@ public class LocalRegion extends AbstractRegion
         try {
         synchronized(re) { // bug #51059 value & version must be obtained atomically
           clientEvent.setVersionTag(re.getVersionStamp().asVersionTag());
-          value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD);
+          value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD, retainResult);
         }
         } finally {
           if (disabled) {
@@ -1214,7 +1300,7 @@ public class LocalRegion extends AbstractRegion
           }
         }
       } else {
-        value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD);
+        value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD, retainResult);
       }
       if (logger.isTraceEnabled() && !(this instanceof HARegion)) {
         logger.trace("getDeserializedValue for {} returning version: {} returnTombstones: {} value: {}",
@@ -1235,6 +1321,7 @@ public class LocalRegion extends AbstractRegion
    * @param updateStats
    * @param disableCopyOnRead if true then do not make a copy on read
    * @param preferCD true if the preferred result form is CachedDeserializable
+   * @param retainResult if true then the result may be a retained off-heap reference
    * @return the value found, which can be
    *  <ul>
    *    <li> null if the value was removed from the region entry
@@ -1242,11 +1329,17 @@ public class LocalRegion extends AbstractRegion
    *    <li>Token.LOCAL_INVALID if the value of the region entry is local invalid
    *  </ul>
    */
-  protected final Object getDeserialized(RegionEntry re, boolean updateStats, boolean disableCopyOnRead, boolean preferCD) {
+  @Retained
+  protected final Object getDeserialized(RegionEntry re, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, boolean retainResult) {
+    assert !retainResult || preferCD;
     try {
-      Object v = null;
+      @Retained Object v = null;
       try {
-         v = re.getValue(this); // OFFHEAP: incrc, deserialize, decrc TODO: optimize if preferCD but need to track down when to decrc in that case
+        if (retainResult) {
+          v = re.getValueRetain(this);
+        } else {
+          v = re.getValue(this);
+        }
       } catch(DiskAccessException dae) {
         this.handleDiskAccessException(dae);
         throw dae;
@@ -1260,7 +1353,7 @@ public class LocalRegion extends AbstractRegion
         if (!preferCD) {
           if (isCopyOnRead()) {
             if (disableCopyOnRead) {
-                v = ((CachedDeserializable)v).getDeserializedForReading();
+              v = ((CachedDeserializable)v).getDeserializedForReading();
             } else {
               v = ((CachedDeserializable)v).getDeserializedWritableCopy(this, re);
             }
@@ -1288,7 +1381,7 @@ public class LocalRegion extends AbstractRegion
   public Object get(Object key, Object aCallbackArgument,
       boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException
   {
-    Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false);
+    Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/);
     if (Token.isInvalid(result)) {
       result = null;
     }
@@ -1300,18 +1393,42 @@ public class LocalRegion extends AbstractRegion
    */
   public Object get(Object key, Object aCallbackArgument,
 	      boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
-	      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException {
+	      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException {
 	  return get(key, aCallbackArgument,
-		      generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false);
+		      generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false);
   }
   
   /**
+   * The result of this operation may be an off-heap reference that the caller must release
+   */
+  @Retained
+  public Object getRetained(Object key, Object aCallbackArgument,
+      boolean generateCallbacks, boolean disableCopyOnRead,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException {
+    return getRetained(key, aCallbackArgument,
+              generateCallbacks, disableCopyOnRead, requestingClient, clientEvent, returnTombstones, false);
+  }
+
+  /**
+   * The result of this operation may be an off-heap reference that the caller must release.
    * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
    */
+  @Retained
+  public Object getRetained(Object key, Object aCallbackArgument,
+      boolean generateCallbacks, boolean disableCopyOnRead,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
+    return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false);
+  }
+  /**
+   * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
+   * @param retainResult if true then the result may be a retained off-heap reference.
+   */
   public Object get(Object key, Object aCallbackArgument,
       boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
-      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, 
+	  boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException
   {
+    assert !retainResult || preferCD;
     validateKey(key);
     validateCallbackArg(aCallbackArgument);
     checkReadiness();
@@ -1321,7 +1438,8 @@ public class LocalRegion extends AbstractRegion
     long start = stats.startGet();
     boolean isMiss = true;
     try {
-      Object value = getDataView().getDeserializedValue(getKeyInfo(key), this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones);
+      KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
+      Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
       final boolean isCreate = value == null;
       isMiss = value == null || Token.isInvalid(value)
           || (!returnTombstones && value == Token.TOMBSTONE);
@@ -1334,13 +1452,14 @@ public class LocalRegion extends AbstractRegion
         // if scope is local and there is no loader, then
         // don't go further to try and get value
         if (!opScopeIsLocal
-            && (getScope().isDistributed()
+            && ((getScope().isDistributed() && !isHDFSRegion())
                 || hasServerProxy()
                 || basicGetLoader() != null)) { 
           // serialize search/load threads if not in txn
-          value = getDataView().findObject(getKeyInfo(key, aCallbackArgument),
+          // TODO OFFHEAP OPTIMIZE: findObject can be enhanced to use the retainResult flag
+          value = getDataView().findObject(keyInfo,
               this, isCreate, generateCallbacks, value, disableCopyOnRead,
-              preferCD, requestingClient, clientEvent, returnTombstones);
+              preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
           if (!returnTombstones && value == Token.TOMBSTONE) {
             value = null;
           }
@@ -1366,7 +1485,7 @@ public class LocalRegion extends AbstractRegion
    */
   final public void recordMiss(final RegionEntry re, Object key) {
     final RegionEntry e;
-    if (re == null && !isTX()) {
+    if (re == null && !isTX() && !isHDFSRegion()) {
       e = basicGetEntry(key);
     } else {
       e = re;
@@ -1375,6 +1494,41 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
+   * @return true if this region has been configured for HDFS persistence
+   */
+  public boolean isHDFSRegion() {
+    return false;
+  }
+
+  /**
+   * @return true if this region is configured to read and write data from HDFS
+   */
+  public boolean isHDFSReadWriteRegion() {
+    return false;
+  }
+
+  /**
+   * @return true if this region is configured to only write to HDFS
+   */
+  protected boolean isHDFSWriteOnly() {
+    return false;
+  }
+
+  /**
+   * FOR TESTING ONLY
+   */
+  public HoplogListenerForRegion getHoplogListener() {
+    return hoplogListener;
+  }
+  
+  /**
+   * FOR TESTING ONLY
+   */
+  public HdfsRegionManager getHdfsRegionManager() {
+    return hdfsManager;
+  }
+  
+  /**
    * optimized to only allow one thread to do a search/load, other threads wait
    * on a future
    *
@@ -1390,15 +1544,18 @@ public class LocalRegion extends AbstractRegion
    * @param clientEvent the client event, if any
    * @param returnTombstones whether to return tombstones
    */
+  @Retained
   Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate,
       boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD,
-      EntryEventImpl clientEvent, boolean returnTombstones) 
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
       throws TimeoutException, CacheLoaderException
   {
+    final Object key = keyInfo.getKey();
+  
     Object localValue = p_localValue;
     boolean isCreate = p_isCreate;
     Object[] valueAndVersion = null;
-    Object result = null;
+    @Retained Object result = null;
     FutureResult thisFuture = new FutureResult(this.stopper);
     Future otherFuture = (Future)this.getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
     // only one thread can get their future into the map for this key at a time
@@ -1418,9 +1575,17 @@ public class LocalRegion extends AbstractRegion
             } else {
               result = cd.getDeserializedForReading();
             }
+           
           } else if (!disableCopyOnRead) {
             result = conditionalCopy(result);
           }
+          //For sqlf since the deserialized value is nothing but chunk
+          // before returning the found value increase its use count
+          if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) {
+            if(!((Chunk)result).retain()) {
+              return null;
+            }
+          }
           // what was a miss is now a hit
           RegionEntry re = null;
           if (isCreate) {
@@ -1446,7 +1611,8 @@ public class LocalRegion extends AbstractRegion
     // didn't find a future, do one more getDeserialized to catch race
     // condition where the future was just removed by another get thread
     try {
-      localValue = getDeserializedValue(keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false);
+      localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
+      // TODO verify that this method is not used for PR or BR and hence allowReadFromHDFS does not matter
       // stats have now been updated
       if (localValue != null && !Token.isInvalid(localValue)) {
         result = localValue;
@@ -1455,7 +1621,7 @@ public class LocalRegion extends AbstractRegion
       isCreate = localValue == null;
 
       result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
-          localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones);
+          localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
       
       if (result == null && localValue != null) {
         if (localValue != Token.TOMBSTONE || returnTombstones) {
@@ -1486,6 +1652,7 @@ public class LocalRegion extends AbstractRegion
       && this.cache.isCopyOnRead()
       && ! this.isUsedForPartitionedRegionAdmin
       && ! this.isUsedForMetaRegion
+      && ! getOffHeap()
       && ! isSecret();
   }
 
@@ -1554,37 +1721,46 @@ public class LocalRegion extends AbstractRegion
       throws TimeoutException, CacheWriterException {
     long startPut = CachePerfStats.getStatTime();
     EntryEventImpl event = newUpdateEntryEvent(key, value, aCallbackArgument);
-      return validatedPut(event, startPut);
+     //Since Sqlfire directly calls validatedPut, the freeing is done in
+    // validatedPut
+     return validatedPut(event, startPut);
+     // TODO OFFHEAP: validatedPut calls freeOffHeapResources
+    
   }
 
   public final Object validatedPut(EntryEventImpl event, long startPut)
       throws TimeoutException, CacheWriterException {
 
-    if (event.getEventId() == null && generateEventID()) {
-      event.setNewEventId(cache.getDistributedSystem());
+    try {
+      if (event.getEventId() == null && generateEventID()) {
+        event.setNewEventId(cache.getDistributedSystem());
+      }
+      Object oldValue = null;
+      // Sqlf changes begin
+      // see #40294.
+
+      // Rahul: this has to be an update.
+      // so executing it as an update.
+      boolean forceUpdateForDelta = event.hasDelta();
+      // Sqlf Changes end.
+      if (basicPut(event, false, // ifNew
+          forceUpdateForDelta, // ifOld
+          null, // expectedOldValue
+          false // requireOldValue
+      )) {
+        if (!event.isOldValueOffHeap()) {
+          // don't copy it to heap just to return from put.
+          // TODO: come up with a better way to do this.
+          oldValue = event.getOldValue();
+        }
+        if (!getDataView().isDeferredStats()) {
+          getCachePerfStats().endPut(startPut, false);
+        }
+      }
+      return handleNotAvailable(oldValue);
+    } finally {
+      event.release();
     }
-    Object oldValue = null;
-    //Sqlf changes begin
-    // see #40294.
-    
-    //Rahul: this has to be an update.
-    // so executing it as an update.
-    boolean forceUpdateForDelta = event.hasDelta();
-    // Sqlf Changes end.
-    if (basicPut(event,
-                   false, // ifNew
-                   forceUpdateForDelta, // ifOld
-                   null,  // expectedOldValue
-                   false // requireOldValue
-                   )) {
-      // don't copy it to heap just to return from put.
-      // TODO: come up with a better way to do this.
-      oldValue = event.getOldValue();
-      if (!getDataView().isDeferredStats()) {
-        getCachePerfStats().endPut(startPut, false);
-      }
-    }
-    return handleNotAvailable(oldValue);
   }
 
   // split into a separate newUpdateEntryEvent since SQLFabric may need to
@@ -1605,13 +1781,29 @@ public class LocalRegion extends AbstractRegion
     // was modified to call the other EntryEventImpl constructor so that
     // an id will be generated by default. Null was passed in anyway.
     //   generate EventID
-    final EntryEventImpl event = new EntryEventImpl(
+    final EntryEventImpl event = EntryEventImpl.create(
         this, Operation.UPDATE, key,
         value, aCallbackArgument, false, getMyId());
+    boolean eventReturned = false;
+    try {
     extractDeltaIntoEvent(value, event);
+    eventReturned = true;
     return event;
+    } finally {
+      if (!eventReturned) event.release();
+    }
+  }
+  /**
+   * Creates an EntryEventImpl that is optimized to not fetch data from HDFS.
+   * This is meant to be used by PUT dml from GemFireXD.
+   */
+  public final EntryEventImpl newPutEntryEvent(Object key, Object value,
+      Object aCallbackArgument) {
+    EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
+    ev.setFetchFromHDFS(false);
+    ev.setPutDML(true);
+    return ev;
   }
-
   private void extractDeltaIntoEvent(Object value, EntryEventImpl event) {
     // 1. Check for DS-level delta property.
     // 2. Default value for operation type is UPDATE, so no need to check that here.
@@ -1740,7 +1932,37 @@ public class LocalRegion extends AbstractRegion
     }
   }
 
-  /**
+  protected boolean includeHDFSResults() {
+    return isUsedForPartitionedRegionBucket() 
+        && isHDFSReadWriteRegion() 
+        && getPartitionedRegion().includeHDFSResults();
+  }
+  
+
+  /** a fast estimate of total number of entries locally in the region */
+  public long getEstimatedLocalSize() {
+    RegionMap rm;
+    if (!this.isDestroyed) {
+      long size;
+      if (isHDFSReadWriteRegion() && this.initialized) {
+        // this size is not used by HDFS region iterators
+        // fixes bug 49239
+        return 0;
+      }
+      // if region has not been initialized yet, then get the estimate from
+      // disk region's recovery map if available
+      if (!this.initialized && this.diskRegion != null
+          && (rm = this.diskRegion.getRecoveredEntryMap()) != null
+          && (size = rm.size()) > 0) {
+        return size;
+      }
+      if ((rm = getRegionMap()) != null) {
+        return rm.size();
+      }
+    }
+    return 0;
+  }
+    /**
    * @param keyInfo
    * @param access
    *          true if caller wants last accessed time updated
@@ -1781,6 +2003,16 @@ public class LocalRegion extends AbstractRegion
     return this.cache.isClosed();
   }
 
+  /**
+   * Returns true if this region is or has been closed or destroyed.
+   * Note that unlike {@link #isDestroyed()} this method will not
+   * return true if the cache is closing but has not yet started closing
+   * this region.
+   */
+  public boolean isThisRegionBeingClosedOrDestroyed() {
+    return this.isDestroyed;
+  }
+  
   /** returns true if this region has been destroyed */
   public boolean isDestroyed()
   {
@@ -1941,9 +2173,15 @@ public class LocalRegion extends AbstractRegion
       RegionEntry entry = this.entries.getEntry(keyInfo.getKey());
       boolean result = entry != null;
       if (result) {
+        SimpleMemoryAllocatorImpl.skipRefCountTracking();
         Object val = entry.getTransformedValue(); // no need to decompress since we only want to know if we have an existing value 
-        // No need to deserialize because of Bruce's fix in r30960 for bug 42162. See bug 42732.
-
+        if (val instanceof StoredObject) {
+          OffHeapHelper.release(val);
+          SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+          return true;
+        }
+        SimpleMemoryAllocatorImpl.unskipRefCountTracking();
+        // No need to to check CachedDeserializable because of Bruce's fix in r30960 for bug 42162. See bug 42732.
         // this works because INVALID and LOCAL_INVALID will never be faulted out of mem
         // If val is NOT_AVAILABLE that means we have a valid value on disk.
         result = !Token.isInvalidOrRemoved(val);
@@ -1992,11 +2230,25 @@ public class LocalRegion extends AbstractRegion
    *
    * author David Whitlock
    */
-  public int entryCount()
-  {
+  public final int entryCount() {
+    return entryCount(null);
+  }
+
+  public int entryCount(Set<Integer> buckets) {
+    return entryCount(buckets, false);
+  }
+
+  protected int entryCount( Set<Integer> buckets, boolean estimate) {
+    assert buckets == null: "unexpected buckets " + buckets + " for region "
+        + toString();
+
     return getDataView().entryCount(this);
   }
 
+  public int entryCountEstimate(final TXStateInterface tx, Set<Integer> buckets, boolean entryCountEstimate) {
+    return entryCount(buckets, entryCountEstimate);
+  }
+
   /**
    * @return size after considering imageState
    */
@@ -2008,6 +2260,9 @@ public class LocalRegion extends AbstractRegion
       if (this.imageState.isClient() && !this.concurrencyChecksEnabled) {
         return result - this.imageState.getDestroyedEntriesCount();
       }
+	if (includeHDFSResults()) {
+      return result;
+    }
       return result - this.tombstoneCount.get();
     }
   }
@@ -2126,13 +2381,17 @@ public class LocalRegion extends AbstractRegion
   protected void validatedInvalidate(Object key, Object aCallbackArgument)
       throws TimeoutException, EntryNotFoundException
   {
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         this, Operation.INVALIDATE,
         key, null, aCallbackArgument, false, getMyId());
+    try {
     if (generateEventID()) {
       event.setNewEventId(cache.getDistributedSystem());
     }
     basicInvalidate(event);
+    } finally {
+      event.release();
+    }
   }
 
   public void localDestroy(Object key, Object aCallbackArgument)
@@ -2141,7 +2400,7 @@ public class LocalRegion extends AbstractRegion
     validateKey(key);
     checkReadiness();
     checkForNoAccess();
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         this,
         Operation.LOCAL_DESTROY, key, null, aCallbackArgument, false, getMyId());
     if (generateEventID()) {
@@ -2159,6 +2418,8 @@ public class LocalRegion extends AbstractRegion
     catch (TimeoutException e) {
       // no distributed lock
       throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_LOCALDESTROY.toLocalizedString(), e);
+    } finally {
+      event.release();
     }
   }
 
@@ -2210,15 +2471,19 @@ public class LocalRegion extends AbstractRegion
     checkReadiness();
     checkForNoAccess();
 
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         this,
         Operation.LOCAL_INVALIDATE, key, null/* newValue */, callbackArgument,
         false, getMyId());
+    try {
     if (generateEventID()) {
       event.setNewEventId(cache.getDistributedSystem());
     }
     event.setLocalInvalid(true);
     basicInvalidate(event);
+    } finally {
+      event.release();
+    }
   }
 
   public void localInvalidateRegion(Object aCallbackArgument)
@@ -2274,7 +2539,7 @@ public class LocalRegion extends AbstractRegion
       // Subclasses may have already called this method, but this is
       // acceptable because addResourceListener won't add it twice
       if (!this.isDestroyed) {
-        cache.getResourceManager().addResourceListener(this);
+        cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
       }
     }
 
@@ -2365,6 +2630,7 @@ public class LocalRegion extends AbstractRegion
       this.indexManager = IndexUtils.getIndexManager(this, true);
     }
     Set<Index> indexes = new HashSet<Index>();
+    Set<Index> prIndexes = new HashSet<Index>();
     int initLevel = 0;
     DiskRegion dr = this.getDiskRegion();
     boolean isOverflowToDisk = false;
@@ -2395,7 +2661,8 @@ public class LocalRegion extends AbstractRegion
             //load entries during initialization only for non overflow regions
             indexes.add(this.indexManager.createIndex(icd.getIndexName(), icd.getIndexType(), 
                 icd.getIndexExpression(), icd.getIndexFromClause(), 
-                icd.getIndexImportString(), externalContext, icd.getPartitionedIndex(), !isOverflowToDisk));           
+                icd.getIndexImportString(), externalContext, icd.getPartitionedIndex(), !isOverflowToDisk));
+            prIndexes.add(icd.getPartitionedIndex());
           } else {
             if (logger.isDebugEnabled()) {
               logger.debug("QueryService Index creation process for {}" + icd.getIndexName());
@@ -2432,6 +2699,9 @@ public class LocalRegion extends AbstractRegion
         // Setting the populate flag to true so that the indexes can apply updates.
         this.indexManager.setPopulateFlagForIndexes(indexes);
       }
+      //due to bug #52096, the pr index populate flags were not being set 
+      //we should revisit and clean up the index creation code paths
+      this.indexManager.setPopulateFlagForIndexes(prIndexes);
     }
     getCachePerfStats().endIndexInitialization(start);
   }
@@ -2653,7 +2923,7 @@ public class LocalRegion extends AbstractRegion
       this.isDestroyed = true;
       // after isDestroyed is set to true call removeResourceListener to fix bug 49555
       this.cache.getResourceManager(false).removeResourceListener(this);
-      this.entries.clear(null);
+      closeEntries();
       if (logger.isDebugEnabled()) {
         logger.debug("recursiveDestroyRegion: Region Destroyed: {}", getFullPath());
       }
@@ -2690,6 +2960,13 @@ public class LocalRegion extends AbstractRegion
       }
     }
   }
+  
+  public void closeEntries() {
+    this.entries.close();
+  }
+  public Set<VersionSource> clearEntries(RegionVersionVector rvv) {
+    return this.entries.clear(rvv);
+  }
 
   @Override
   public void checkReadiness()
@@ -2698,6 +2975,19 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
+   * This method should be called when the caller cannot locate an entry and that condition
+   * is unexpected.  This will first double check the cache and region state before throwing
+   * an EntryNotFoundException.  EntryNotFoundException should be a last resort exception.
+   * 
+   * @param entryKey the missing entry's key.
+   */
+  public void checkEntryNotFound(Object entryKey) {
+    checkReadiness();
+    // Localized string for partitioned region is generic enough for general use
+    throw new EntryNotFoundException(LocalizedStrings.PartitionedRegion_ENTRY_NOT_FOUND_FOR_KEY_0.toLocalizedString(entryKey));    
+  }
+  
+  /**
 
    * Search for the value in a server (if one exists),
    * then try a loader.
@@ -2708,11 +2998,11 @@ public class LocalRegion extends AbstractRegion
    * @param clientEvent the client's event, if any.  If not null, we set the version tag
    * @param returnTombstones TODO
    * @return the deserialized value
-   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
+   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean )
    */
   protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
       TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
-      EntryEventImpl clientEvent, boolean returnTombstones)
+      EntryEventImpl clientEvent, boolean returnTombstones,  boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     final Object key = keyInfo.getKey();
@@ -2728,15 +3018,19 @@ public class LocalRegion extends AbstractRegion
       ServerRegionProxy mySRP = getServerProxy();
       if (mySRP != null) {
         holder = EntryEventImpl.createVersionTagHolder();
-        value = mySRP.get(key, aCallbackArgument, holder);
-        fromServer = value != null;
+        try {
+          value = mySRP.get(key, aCallbackArgument, holder);
+          fromServer = value != null;
+        } finally {
+          holder.release();
+        }
       }
     }
     
     /*
      * If we didn't get anything from the server, try the loader
      */
-    if (!fromServer) {
+    if (!fromServer || value == Token.TOMBSTONE) {
       // copy into local var to prevent race condition
       CacheLoader loader = basicGetLoader();
       if (loader != null) {
@@ -2749,6 +3043,7 @@ public class LocalRegion extends AbstractRegion
         long statStart = stats.startLoad();
         try {
           value = loader.load(loaderHelper);
+          fromServer = false;
         }
         finally {
           stats.endLoad(statStart);
@@ -2756,12 +3051,18 @@ public class LocalRegion extends AbstractRegion
       }
     }
 
+    // don't allow tombstones into a client cache if it doesn't
+    // have concurrency checks enabled
+    if (fromServer && 
+        value == Token.TOMBSTONE && !this.concurrencyChecksEnabled) {
+      value = null;
+    }
     
     /*
      * If we got a value back, let's put it in the cache.
      */
     RegionEntry re = null;
-    if (value != null && !isHeapThresholdReachedForLoad()) {
+    if (value != null && !isMemoryThresholdReachedForLoad()) {
 
       long startPut = CachePerfStats.getStatTime();
       validateKey(key);
@@ -2774,8 +3075,9 @@ public class LocalRegion extends AbstractRegion
       }
 
       EntryEventImpl event
-        = new EntryEventImpl(this, op, key, value, aCallbackArgument,
+        = EntryEventImpl.create(this, op, key, value, aCallbackArgument,
                              false, getMyId(), generateCallbacks);
+      try {
 
       // bug #47716 - do not put an invalid entry into the cache if there's
       // already one there with the same version
@@ -2802,7 +3104,7 @@ public class LocalRegion extends AbstractRegion
             clientEvent.setVersionTag(event.getVersionTag());
             clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
           }
-          if (fromServer && (event.getNewValue() == Token.TOMBSTONE)) {
+          if (fromServer && (event.getRawNewValue() == Token.TOMBSTONE)) {
             return null; // tombstones are destroyed entries
           }
         } catch (ConcurrentCacheModificationException e) {
@@ -2821,6 +3123,9 @@ public class LocalRegion extends AbstractRegion
           logger.debug("findObjectInSystem: writer exception putting entry {}", event, cwe);
         }
       }
+      } finally {     
+          event.release();        
+      }
     }
     if (isCreate) {
       recordMiss(re, key);
@@ -2828,8 +3133,8 @@ public class LocalRegion extends AbstractRegion
     return value;
   }
 
-  protected boolean isHeapThresholdReachedForLoad() {
-    return this.heapThresholdReached.get();
+  protected boolean isMemoryThresholdReachedForLoad() {
+    return this.memoryThresholdReached.get();
   }
   
   /**
@@ -2843,6 +3148,7 @@ public class LocalRegion extends AbstractRegion
    * @return whether the entry is already invalid
    */
   protected boolean alreadyInvalid(Object key, EntryEventImpl event) {
+    @Unretained(ENTRY_EVENT_NEW_VALUE)
     Object newValue = event.getRawNewValue();
     if (newValue == null || Token.isInvalid(newValue)) {
       RegionEntry entry = this.entries.getEntry(key);
@@ -2968,6 +3274,8 @@ public class LocalRegion extends AbstractRegion
         // @todo grid: is the newEntry flag needed?
         Object key = event.getKey();
         Object value = event.getRawNewValue();
+        // serverPut is called by cacheWriteBeforePut so the new value will not yet be off-heap
+        // TODO OFFHEAP: verify that the above assertion is true
         Object callbackArg = event.getRawCallbackArgument();
         boolean isCreate = event.isCreate(); 
         Object result = mySRP.put(key, value, event.getDeltaBytes(), event,
@@ -3404,14 +3712,15 @@ public class LocalRegion extends AbstractRegion
    * @param keyInfo
    * @return TODO
    */
+  @Retained
   protected Object nonTXbasicGetValueInVM(KeyInfo keyInfo) {
     RegionEntry re = this.entries.getEntry(keyInfo.getKey());
     if (re == null) {
-      throw new EntryNotFoundException(keyInfo.getKey().toString());
+      checkEntryNotFound(keyInfo.getKey());
     }
     Object v = re.getValueInVM(this); // OFFHEAP returned to callers
     if (Token.isRemoved(v)) {
-      throw new EntryNotFoundException(keyInfo.getKey().toString());
+      checkEntryNotFound(keyInfo.getKey());
     }
     if (v == Token.NOT_AVAILABLE) {
       return null;
@@ -4145,7 +4454,7 @@ public class LocalRegion extends AbstractRegion
     }
     checkReadiness();
     validateKey(key);
-    EntryEventImpl event = new EntryEventImpl(this, Operation.LOCAL_DESTROY,
+    EntryEventImpl event = EntryEventImpl.create(this, Operation.LOCAL_DESTROY,
         key, false, getMyId(), false /* generateCallbacks */, true);
     try {
       basicDestroy(event,
@@ -4162,6 +4471,8 @@ public class LocalRegion extends AbstractRegion
     }
     catch (EntryNotFoundException e) {
       // not a problem
+    } finally {
+      event.release();
     }
   }
 
@@ -4355,7 +4666,6 @@ public class LocalRegion extends AbstractRegion
             if (currentKey == null || getImageState().hasDestroyedEntry(currentKey)) {
               continue;
             }
-            localDestroyNoCallbacks(currentKey);
             Object val = entry.getObject();
             boolean isBytes = entry.isBytes();
             boolean isKeyOnServer = !entry.isKeyNotOnServer();
@@ -4366,12 +4676,15 @@ public class LocalRegion extends AbstractRegion
             if (val instanceof Throwable) {
               logger.warn(LocalizedMessage.create(LocalizedStrings.LocalRegion_CAUGHT_THE_FOLLOWING_EXCEPTION_FOR_KEY_0_WHILE_PERFORMING_A_REMOTE_GETALL,
                       currentKey), (Throwable)val);
-              //TODO - shouldn't we skip this value and not put it in the region
-              //then? I think we're putting an Exception object in the region.
+              localDestroyNoCallbacks(currentKey);
+              continue;
             } else {
               if(logger.isDebugEnabled()) {
                 logger.debug("refreshEntries key={} value={}", currentKey, entry);
               }
+              if (tag == null) { // no version checks
+                localDestroyNoCallbacks(currentKey);
+              }
             }
             
             if(val instanceof byte[] && !isBytes) {
@@ -4642,6 +4955,7 @@ public class LocalRegion extends AbstractRegion
   }
 
   /** Package helper method */
+  @Retained
   Object getEntryValue(RegionEntry entry)
   {
     if (entry == null) {
@@ -5096,6 +5410,9 @@ public class LocalRegion extends AbstractRegion
     // Notify bridge clients (if this is a BridgeServer)
     event.setEventType(eventType);
     notifyBridgeClients(event);
+  if (this.hdfsStoreName != null) {
+    notifyGatewaySender(eventType, event);
+    }
     if(callDispatchListenerEvent){
       dispatchListenerEvent(eventType, event);
     }
@@ -5280,10 +5597,11 @@ public class LocalRegion extends AbstractRegion
     //to get Hash. If the partitioning column is different from primary key, 
     //the resolver for Sqlfabric is not able to obtain the hash object used for creation of KeyInfo  
      
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.CREATE, key,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.CREATE, key,
        value, callbackArg,  false /* origin remote */, client.getDistributedMember(),
         true /* generateCallbacks */,
         eventId);
+    try {
     event.setContext(client);
     
     // if this is a replayed operation or WAN event we may already have a version tag
@@ -5333,6 +5651,9 @@ public class LocalRegion extends AbstractRegion
       }
     }
     return success;
+    } finally {
+      event.release();
+    }
   }
 
   public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes,
@@ -5350,11 +5671,12 @@ public class LocalRegion extends AbstractRegion
       }
     }
    
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.UPDATE, key,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
         null /* new value */, callbackArg,
         false /* origin remote */, memberId.getDistributedMember(),
         true /* generateCallbacks */,
         eventID);
+    try {
     event.setContext(memberId);
     event.setDeltaBytes(deltaBytes);
 
@@ -5404,6 +5726,9 @@ public class LocalRegion extends AbstractRegion
       this.stopper.checkCancelInProgress(null);
     }
     return success;
+    } finally {
+      event.release();
+    }
   }
 
   /**
@@ -5494,13 +5819,14 @@ public class LocalRegion extends AbstractRegion
 
       // Create an event and put the entry
       EntryEventImpl event =
-        new EntryEventImpl(this,
+        EntryEventImpl.create(this,
                            Operation.INVALIDATE,
                            key, null /* newValue */,
                            callbackArgument /* callbackArg*/,
                            true /*originRemote*/,
                            serverId
                            );
+      try {
 
       event.setVersionTag(versionTag);
       event.setFromServer(true);
@@ -5527,6 +5853,9 @@ public class LocalRegion extends AbstractRegion
               true);
         }
       }
+      } finally {
+        event.release();
+      }
     }
   }
 
@@ -5545,13 +5874,14 @@ public class LocalRegion extends AbstractRegion
 
       // Create an event and destroy the entry
       EntryEventImpl event =
-        new EntryEventImpl(this,
+        EntryEventImpl.create(this,
                            Operation.DESTROY,
                            key, null /* newValue */,
                            callbackArgument /* callbackArg*/,
                            true /*originRemote*/,
                            serverId
                            );
+      try {
       event.setFromServer(true);
       event.setVersionTag(versionTag);
       
@@ -5578,6 +5908,9 @@ public class LocalRegion extends AbstractRegion
           invokeDestroyCallbacks(EnumListenerEvent.AFTER_DESTROY, event, true, true);
         }
       }
+      } finally {
+        event.release();
+      }
     }
   }
 
@@ -5622,11 +5955,12 @@ public class LocalRegion extends AbstractRegion
     }
 
     // Create an event and put the entry
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.DESTROY, key,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.DESTROY, key,
         null /* new value */, callbackArg,
         false /* origin remote */, memberId.getDistributedMember(),
         true /* generateCallbacks */,
         clientEvent.getEventId());
+    try {
     event.setContext(memberId);
     // if this is a replayed or WAN operation we may already have a version tag
     event.setVersionTag(clientEvent.getVersionTag());
@@ -5641,6 +5975,9 @@ public class LocalRegion extends AbstractRegion
       clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
       clientEvent.setIsRedestroyedEntry(event.getIsRedestroyedEntry());
     }
+    } finally {
+      event.release();
+    }
   }
 
   
@@ -5658,11 +5995,12 @@ public class LocalRegion extends AbstractRegion
     }
 
     // Create an event and put the entry
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.INVALIDATE, key,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.INVALIDATE, key,
         null /* new value */, callbackArg,
         false /* origin remote */, memberId.getDistributedMember(),
         true /* generateCallbacks */,
         clientEvent.getEventId());
+    try {
     event.setContext(memberId);
     
     // if this is a replayed operation we may already have a version tag
@@ -5674,13 +6012,16 @@ public class LocalRegion extends AbstractRegion
       clientEvent.setVersionTag(event.getVersionTag());
       clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
     }
+    } finally {
+      event.release();
+    }
   }
 
   public void basicBridgeUpdateVersionStamp(Object key, Object p_callbackArg,
       ClientProxyMembershipID memberId, boolean fromClient, EntryEventImpl clientEvent) {
  
     // Create an event and update version stamp of the entry
-    EntryEventImpl event = new EntryEventImpl(this, Operation.UPDATE_VERSION_STAMP, key,
+    EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE_VERSION_STAMP, key,
         null /* new value */, null /*callbackArg*/,
         false /* origin remote */, memberId.getDistributedMember(),
         false /* generateCallbacks */,
@@ -5695,6 +6036,7 @@ public class LocalRegion extends AbstractRegion
     } finally {
       clientEvent.setVersionTag(event.getVersionTag());
       clientEvent.isConcurrencyConflict(event.isConcurrencyConflict());
+      event.release();
     }
   }
 
@@ -5770,7 +6112,7 @@ public class LocalRegion extends AbstractRegion
                      final boolean overwriteDestroyed)
   throws TimeoutException,
         CacheWriterException {
-    if (!InternalResourceManager.isLowMemoryExceptionDisabled()) {
+    if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
       checkIfAboveThreshold(event);
     }
     Operation originalOp = event.getOperation();
@@ -5800,7 +6142,7 @@ public class LocalRegion extends AbstractRegion
 
     if (mySRP != null && this.dataPolicy == DataPolicy.EMPTY) {
       if (originalOp == Operation.PUT_IF_ABSENT) {
-        return event.getOldValue() == null;
+        return !event.hasOldValue();
       }
       if (originalOp == Operation.REPLACE && !requireOldValue) {
         // LocalRegion.serverPut throws an EntryNotFoundException if the operation failed
@@ -5835,13 +6177,13 @@ public class LocalRegion extends AbstractRegion
    * @throws LowMemoryException if the target member for this operation is sick
    */
   private void checkIfAboveThreshold(final Object key) throws LowMemoryException{
-    if (heapThresholdReached.get()) {
-      Set<DistributedMember> htrm = getHeapThresholdReachedMembers();
+    if (memoryThresholdReached.get()) {
+      Set<DistributedMember> htrm = getMemoryThresholdReachedMembers();
 
       // #45603: trigger a background eviction since we're above the the critical 
       // threshold
-      InternalResourceManager.getInternalResourceManager(cache).triggerMemoryEvent();
-      
+      InternalResourceManager.getInternalResourceManager(cache).getHeapMonitor().updateStateAndSendEvent();
+
       Object[] prms = new Object[] {getFullPath(), key, htrm};
       throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_IN_0_FOR_PUT_1_MEMBER_2.toLocalizedString(prms),
           htrm);
@@ -6040,6 +6382,7 @@ public class LocalRegion extends AbstractRegion
     
     // Create updateTimeStampEvent from event.
     EntryEventImpl updateTimeStampEvent = EntryEventImpl.createVersionTagHolder(event.getVersionTag());
+    try {
     updateTimeStampEvent.setOperation(Operation.UPDATE_VERSION_STAMP);
     updateTimeStampEvent.setKeyInfo(event.getKeyInfo());
     updateTimeStampEvent.setGenerateCallbacks(false);
@@ -6064,6 +6407,9 @@ public class LocalRegion extends AbstractRegion
       updateTimeStampEvent.setRegion(event.getRegion());
       notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, updateTimeStampEvent);
     }
+    } finally {
+      updateTimeStampEvent.release();
+    }
   }
 
 
@@ -6233,6 +6579,13 @@ public class LocalRegion extends AbstractRegion
      return null;
    }
    
+   public VersionTag findVersionTagForGatewayEvent(EventID eventId) {
+     if (this.eventTracker != null) {
+       return this.eventTracker.findVersionTagForGateway(eventId);
+     }
+     return null;
+   }
+   
    /**
     * tries to find the version tag for a replayed client event
     * @param eventId
@@ -6354,6 +6707,33 @@ public class LocalRegion extends AbstractRegion
     }
   }
 
+  /**
+   * Returns true if this region notifies multiple serial gateways.
+   */
+  public boolean notifiesMultipleSerialGateways() {
+    if (isPdxTypesRegion()) {
+      return false;
+    }
+    int serialGatewayCount = 0;
+    Set<String> allGatewaySenderIds = getAllGatewaySenderIds();
+    if (!allGatewaySenderIds.isEmpty()) {
+      List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
+      if (allRemoteDSIds != null) {
+        for (GatewaySender sender : getCache().getAllGatewaySenders()) {
+          if (allGatewaySenderIds.contains(sender.getId())) {
+            if (!sender.isParallel()) {
+              serialGatewayCount++;
+              if (serialGatewayCount > 1) {
+                return true;
+              }
+            }
+          }
+        }
+      }
+    }
+    return false;
+  }
+  
   protected void notifyGatewaySender(EnumListenerEvent operation,
       EntryEventImpl event) {
     
@@ -6605,22 +6985,44 @@ public class LocalRegion extends AbstractRegion
   protected void postCreateRegion()
   {
     if (getEvictionAttributes().getAlgorithm().isLRUHeap()) {
+      final LogWriter logWriter = cache.getLogger();
+      float evictionPercentage = DEFAULT_HEAPLRU_EVICTION_HEAP_PERCENTAGE;
       // This is new to 6.5. If a heap lru region is created
-      // we make sure that the EvictionHeapPercentage is enabled.
+      // we make sure that the eviction percentage is enabled.
       InternalResourceManager rm = this.cache.getResourceManager();
-      if (!rm.hasEvictionThreshold()) { // fix for bug 42130
-        float chp = rm.getCriticalHeapPercentage();
-        if (chp > 0.0f) {
-          if (chp >= 10.0f) {
-            chp -= 5.0f;
+      if (!getOffHeap()) {
+        if (!rm.getHeapMonitor().hasEvictionThreshold()) {
+          float criticalPercentage = rm.getCriticalHeapPercentage();
+          if (criticalPercentage > 0.0f) {
+            if (criticalPercentage >= 10.f) {
+              evictionPercentage = criticalPercentage - 5.0f;
+            } else {
+              evictionPercentage = criticalPercentage;
+            }
+          }
+          rm.setEvictionHeapPercentage(evictionPercentage);
+          if (logWriter.fineEnabled()) {
+            logWriter.fine("Enabled heap eviction at " + evictionPercentage + " percent for LRU region");
+          }
+        }
+      } else {
+        if (!rm.getOffHeapMonitor().hasEvictionThreshold()) {
+          float criticalPercentage = rm.getCriticalOffHeapPercentage();
+          if (criticalPercentage > 0.0f) {
+            if (criticalPercentage >= 10.f) {
+              evictionPercentage = criticalPercentage - 5.0f;
+            } else {
+              evictionPercentage = criticalPercentage;
+            }
+          }
+          rm.setEvictionOffHeapPercentage(evictionPercentage);
+          if (logWriter.fineEnabled()) {
+            logWriter.fine("Enabled off-heap eviction at " + evictionPercentage + " percent for LRU region");
           }
-          rm.setEvictionHeapPercentage(chp);
-        } else {
-          rm.setEvictionHeapPercentage(DEFAULT_HEAPLRU_EVICTION_HEAP_PERCENTAGE);
         }
       }
     }
-    
+      
     if (!isInternalRegion()) {
       getCachePerfStats().incRegions(1);
       if (getMembershipAttributes().hasRequiredRoles()) {
@@ -6888,13 +7290,26 @@ public class LocalRegion extends AbstractRegion
    * @return an event for EVICT_DESTROY
    */
   protected EntryEventImpl generateEvictDestroyEvent(final Object key) {
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         this, Operation.EVICT_DESTROY, key, null/* newValue */,
         null, false, getMyId());
     // Fix for bug#36963
     if (generateEventID()) {
       event.setNewEventId(cache.getDistributedSystem());
     }
+    event.setFetchFromHDFS(false);
+    return event;
+  }
+    protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
+    EntryEventImpl event =  EntryEventImpl.create(
+        this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */,
+        null, false, getMyId());
+    
+    // Fix for bug#36963
+    if (generateEventID()) {
+      event.setNewEventId(cache.getDistributedSystem());
+    }
+    event.setFetchFromHDFS(false);
     return event;
   }
   
@@ -6921,6 +7336,8 @@ public class LocalRegion extends AbstractRegion
     }
     catch (EntryNotFoundException yetAnotherError) {
       throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError);
+    } finally {
+      event.release();
     }
   }
 
@@ -7271,16 +7688,17 @@ public class LocalRegion extends AbstractRegion
                 .setCacheListenerInvocationInProgress(true);
           }
         }
-
+        
         if (!GemFireCacheImpl.ASYNC_EVENT_LISTENERS) {
           dispatchEvent(this, event, op);
         }
         else {
+          final EventDispatcher ed = new EventDispatcher(event, op);
           try {
-            this.cache.getEventThreadPool().execute(
-                new EventDispatcher(event, op));
+            this.cache.getEventThreadPool().execute(ed);
           }
           catch (RejectedExecutionException rex) {
+            ed.release();
             logger.warn(LocalizedMessage.create(LocalizedStrings.LocalRegion_0_EVENT_NOT_DISPATCHED_DUE_TO_REJECTED_EXECUTION), rex);
           }
         }
@@ -7383,10 +7801,13 @@ public class LocalRegion extends AbstractRegion
   void cleanupFailedInitialization()
   {
     // mark as destroyed
+    // TODO OFFHEAP MERGE: to fix 49905 asif commented out isDestroyed being set.
+    // But in xd it was set after closeEntries was called.
+    // Here it is set before and it fixed 49555.
     this.isDestroyed = true;
     // after isDestroyed is set to true call removeResourceListener to fix bug 49555
     this.cache.getResourceManager(false).removeResourceListener(this);
-    this.entries.clear(null); //fixes bug 41333
+    closeEntries(); //fixes bug 41333
     this.destroyedSubregionSerialNumbers = collectSubregionSerialNumbers();
     try {
       if (this.eventTracker != null) {
@@ -7723,7 +8144,7 @@ public class LocalRegion extends AbstractRegion
    * @since prPersistSprint2
    */
   public void foreachRegionEntry(RegionEntryCallback callback) {
-    Iterator it = this.entries.regionEntries().iterator();
+    Iterator it = this.entries.regionEntriesInVM().iterator();
     while (it.hasNext()) {
       callback.handleRegionEntry((RegionEntry)it.next());
     }
@@ -7799,12 +8220,16 @@ public class LocalRegion extends AbstractRegion
     for (Iterator itr = keySet().iterator(); itr.hasNext();) {
       try {
         //EventID will not be generated by this constructor
-        EntryEventImpl event = new EntryEventImpl(
+        EntryEventImpl event = EntryEventImpl.create(
             this, op, itr.next() /*key*/,
             null/* newValue */, null/* callbackArg */, rgnEvent.isOriginRemote(),
             rgnEvent.getDistributedMember());
+        try {
         event.setLocalInvalid(!rgnEvent.getOperation().isDistributed());
         basicInvalidate(event, false);
+        } finally {
+          event.release();
+        }
       }
       catch (EntryNotFoundException e) {
         // ignore
@@ -7927,7 +8352,7 @@ public class LocalRegion extends AbstractRegion
                                isOverflowEnabled(), isDiskSynchronous(),
                                stats, getCancelCriterion(), this, getAttributes(),
                                diskFlags, "NO_PARTITITON", -1,
-                               getCompressor());
+                               getCompressor(), getOffHeap());
     } else {
       return null;
     }
@@ -8224,7 +8649,7 @@ public class LocalRegion extends AbstractRegion
 
     @Override
     public Object getValue() {
-      Object value = this.region.getDeserialized(getCheckedRegionEntry(), false, false, false);
+      Object value = this.region.getDeserialized(getCheckedRegionEntry(), false, false, false, false);
       if (value == null) {
         throw new EntryDestroyedException(getKey().toString());
       }
@@ -8576,7 +9001,7 @@ public class LocalRegion extends AbstractRegion
 
   /////////////////////// Transaction Helper Methods ////////////////////
 
-  public final TXStateInterface getTXState() {
+  public final TXStateProxy getTXState() {
     if (this.supportsTX) {
       return TXManagerImpl.getCurrentTXState();
     }
@@ -8767,6 +9192,7 @@ public class LocalRegion extends AbstractRegion
   protected final static void dispatchEvent(LocalRegion region,
       InternalCacheEvent event, EnumListenerEvent op)
   {
+    
     CacheListener[] listeners = region.fetchCacheListenersField();
     if (listeners == null || listeners.length == 0) {
       return;
@@ -8815,7 +9241,7 @@ public class LocalRegion extends AbstractRegion
           logger.error(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_OCCURRED_IN_CACHELISTENER), t);
         }
       }
-    }
+    }    
   }
 
   /** ********************* Class EventDispatcher ***************************** */
@@ -8827,15 +9253,32 @@ public class LocalRegion extends AbstractRegion
     EnumListenerEvent op;
 
     EventDispatcher(InternalCacheEvent event, EnumListenerEvent op) {
+                
+      if (LocalRegion.this.offHeap && event instanceof EntryEventImpl) {
+        // Make a copy that has its own off-heap refcount so fix bug 48837
+        event = new EntryEventImpl( (EntryEventImpl)event);   
+      }
       this.event = event;
       this.op = op;
     }
 
     public void run()
     {
-      dispatchEvent(LocalRegion.this, this.event, this.op);
+      try {
+        dispatchEvent(LocalRegion.this, this.event, this.op);
+      }finally {
+        this.release();
+      }
+    }
+    
+    public void release() {
+      if (LocalRegion.this.offHeap && this.event instanceof EntryEventImpl) {
+        ((EntryEventImpl)this.event).release();
+      }      
     }
   }
+  
+  
 
   /** ******************* Class SubregionsSet ********************************* */
 
@@ -9048,7 +9491,7 @@ public class LocalRegion extends AbstractRegion
 
     public Object getValue(boolean ignoreCopyOnRead)
     {
-        Object value = getDeserialized(this.basicGetEntry(), false, ignoreCopyOnRead, false);
+        Object value = getDeserialized(this.basicGetEntry(), false, ignoreCopyOnRead, false, false);
         if (value == null) {
           throw new EntryDestroyedException(getKey().toString());
         }
@@ -9203,6 +9646,22 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
+   * returns an estimate of the number of entries in this region. This method
+   * should be prefered over size() for hdfs regions where an accurate size is
+   * not needed. This method is not supported on a client
+   * 
+   * @return the estimated size of this region
+   */
+  public int sizeEstimate() {
+    boolean isClient = this.imageState.isClient();
+    if (isClient) {
+      throw new UnsupportedOperationException(
+          "Method not supported on a client");
+    }
+    return entryCount(null, true);
+  }
+
+  /**
    * This method returns true if Region is Empty.
    */
   public boolean isEmpty()
@@ -9466,7 +9925,7 @@ public class LocalRegion extends AbstractRegion
       // Now remove the tranxnl entries for this region
       this.txClearRegion();
       // Now clear the map of committed entries
-      Set<VersionSource> remainingIDs = this.entries.clear(rvv);
+      Set<VersionSource> remainingIDs = clearEntries(rvv);
       if (!this.dataPolicy.withPersistence()) { // persistent regions do not reap IDs
         if (myVector != null) {
           myVector.removeOldMembers(remainingIDs);
@@ -9474,6 +9933,8 @@ public class LocalRegion extends AbstractRegion
       }
     }
     
+    clearHDFSData();
+    
     if (!isProxy()) {
       // Now we need to recreate all the indexes.
       //If the indexManager is null we don't have to worry
@@ -9512,6 +9973,11 @@ public class LocalRegion extends AbstractRegion
     }
   }
 
+  /**Clear HDFS data, if present */
+  protected void clearHDFSData() {
+    //do nothing, clear is implemented for subclasses like BucketRegion.
+  }
+
   @Override
   void basicLocalClear(RegionEventImpl rEvent)
   {
@@ -9528,6 +9994,7 @@ public class LocalRegion extends AbstractRegion
   @Override
   Map basicGetAll(Collection keys, Object callback) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
+    final boolean isTraceEnabled = logger.isTraceEnabled();
     
     if (isDebugEnabled) {
       logger.debug("Processing getAll request for: {}", keys);
@@ -9606,9 +10073,10 @@ public class LocalRegion extends AbstractRegion
           // correct events will be delivered to any callbacks we have.
           long startPut = CachePerfStats.getStatTime();
           validateKey(key);
-          EntryEventImpl event = new EntryEventImpl(
+          EntryEventImpl event = EntryEventImpl.create(
               this, Operation.LOCAL_LOAD_CREATE, key, value,
               callback, false, getMyId(), true);
+          try {
           event.setFromServer(true);
           event.setVersionTag(entry.getVersionTag());
 
@@ -9628,10 +10096,13 @@ public class LocalRegion extends AbstractRegion
 
           if (!createTombstone) {
             allResults.put(key, value);
-            if (isDebugEnabled) {
-              logger.debug("Added remote result for getAll request: {}, {}", key, value);
+            if (isTraceEnabled) {
+              logger.trace("Added remote result for getAll request: {}, {}", key, value);
             }
           }
+          } finally {
+            event.release();
+          }
         }
       }
     } else {
@@ -9666,7 +10137,7 @@ public class LocalRegion extends AbstractRegion
       if (mapEntry.getValue() == null || key == null) {
         throw new NullPointerException("Any key or value in putAll should not be null");
       }
-      if (!InternalResourceManager.isLowMemoryExceptionDisabled()) {
+      if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
         checkIfAboveThreshold(key);
       }
       // Threshold check should not be performed again 
@@ -9697,17 +10168,25 @@ public class LocalRegion extends AbstractRegion
       callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
     }
   
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.PUTALL_CREATE, null,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE, null,
         null /* new value */, callbackArg,
         false /* origin remote */, memberId.getDistributedMember(),
         !skipCallbacks /* generateCallbacks */,
         eventId);
+    try {
     event.setContext(memberId);
     DistributedPutAllOperation putAllOp = new DistributedPutAllOperation(event,
         map.size(), true);
+    try {
     VersionedObjectList result = basicPutAll(map, putAllOp, retryVersions);
     getCachePerfStats().endPutAll(startPut);
     return result;
+    } finally {
+      putAllOp.freeOffHeapResources();
+    }
+    } finally {
+      event.release();
+    }
   }
   
   /**
@@ -9725,28 +10204,44 @@ public class LocalRegion extends AbstractRegion
       callbackArg = new GatewaySenderEventCallbackArgument(callbackArg);
     }
   
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.REMOVEALL_DESTROY, null,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVEALL_DESTROY, null,
         null /* new value */, callbackArg,
         false /* origin remote */, memberId.getDistributedMember(),
         true /* generateCallbacks */,
         eventId);
+    try {
     event.setContext(memberId);
     DistributedRemoveAllOperation removeAllOp = new DistributedRemoveAllOperation(event, keys.size(), true);
+    try {
     VersionedObjectList result = basicRemoveAll(keys, removeAllOp, retryVersions);
     getCachePerfStats().endRemoveAll(startOp);
     return result;
+    } finally {
+      removeAllOp.freeOffHeapResources();
+    }
+    } finally {
+      event.release();
+    }
   }
   
   public VersionedObjectList basicImportPutAll(Map map, boolean skipCallbacks) {
     long startPut = CachePerfStats.getStatTime();
 
     // generateCallbacks == false
-    EntryEventImpl event = new EntryEventImpl(this, Operation.PUTALL_CREATE,
+    EntryEventImpl event = EntryEventImpl.create(this, Operation.PUTALL_CREATE,
         null, null, null, true, getMyId(), !skipCallbacks);
+    try {
     DistributedPutAllOperation putAllOp = new DistributedPutAllOperation(event, map.size(), false);
+    try {
     VersionedObjectList result = basicPutAll(map, putAllOp, null);
     getCachePerfStats().endPutAll(startPut);
     return result;
+    } finally {
+      putAllOp.freeOffHeapResources();
+    }
+    } finally {
+      event.release();
+    }
   }
 
   @Override
@@ -9754,7 +10249,12 @@ public class LocalRegion extends AbstractRegion
     long startPut = CachePerfStats.getStatTime();
     final DistributedPutAllOperation putAllOp = newPutAllOperation(map, callbackArg);
     if (putAllOp != null) {
+      try {
       basicPutAll(map, putAllOp, null);
+      } finally {
+        putAllOp.getBaseEvent().release();
+        putAllOp.freeOffHeapResources();
+      }
     }
     
     getCachePerfStats().endPutAll(startPut);
@@ -9775,7 +10275,12 @@ public class LocalRegion extends AbstractRegion
     final long startOp = CachePerfStats.getStatTime();
     final DistributedRemoveAllOperation op = newRemoveAllOperation(keys, callbackArg);
     if (op != null) {
+      try {
       basicRemoveAll(keys, op, null);
+      } finally {
+        op.getBaseEvent().release();
+        op.freeOffHeapResources();
+      }
     }
     getCachePerfStats().endRemoveAll(startOp);
   }
@@ -9919,7 +10424,11 @@ public class LocalRegion extends AbstractRegion
               }
               
               if (!overwritten) {
+                try {
                   basicEntryPutAll(key, value, dpao, offset, tagHolder);
+                } finally {
+                  tagHolder.release();
+                }
               }
               // now we must check again since the cache may have closed during
               // distribution (causing this process to not receive and queue the
@@ -9978,6 +10487,8 @@ public class LocalRegion extends AbstractRegion
       e = new RuntimeException(ex);
     } finally {
       unlockRVVForBulkOp();
+      putAllOp.getBaseEvent().release();
+      putAllOp.freeOffHeapResources();
     }
     getDataView().postPutAll(putAllOp, succeeded, this);
     if (e != null) {
@@ -10187,6 +10698,8 @@ public class LocalRegion extends AbstractRegion
       e = new RuntimeException(ex);
     } finally {
       unlockRVVForBulkOp();
+      removeAllOp.getBaseEvent().release();
+      removeAllOp.freeOffHeapResources();
     }
     getDataView().postRemoveAll(removeAllOp, succeeded, this);
     if (e != null) {
@@ -10232,10 +10745,20 @@ public class LocalRegion extends AbstractRegion
     // Create a dummy event for the PutAll operation.  Always create a
     // PutAll operation, even if there is no distribution, so that individual
     // events can be tracked and handed off to callbacks in postPutAll
-    final EntryEventImpl event = new EntryEventImpl(this,
+    final EntryEventImpl event = EntryEventImpl.create(this,
         Operation.PUTALL_CREATE, null, null, callbackArg, true, getMyId());
-    return new DistributedPutAllOperation(event, map.size(), false);
+
+    event.disallowOffHeapValues();
+    DistributedPutAllOperation dpao = new DistributedPutAllOperation(event, map.size(), false);
+    return dpao;
+  }
+    public final DistributedPutAllOperation newPutAllForPUTDmlOperation(Map<?, ?> map, Object callbackArg) {
+    DistributedPutAllOperation dpao = newPutAllOperation(map, callbackArg);
+    dpao.getEvent().setFetchFromHDFS(false);
+    dpao.getEvent().setPutDML(true);
+    return dpao;
   }
+
   
   public final DistributedRemoveAllOperation newRemoveAllOperation(Collection<?> keys, Object callbackArg) {
     if (keys == null) {
@@ -10251,8 +10774,9 @@ public class LocalRegion extends AbstractRegion
     // Create a dummy event for the removeAll operation.  Always create a
     // removeAll operation, even if there is no distribution, so that individual
     // events can be tracked and handed off to callbacks in postRemoveAll
-    final EntryEventImpl event = new EntryEventImpl(this, Operation.REMOVEALL_DESTROY, null,
+    final EntryEventImpl event = EntryEventImpl.create(this, Operation.REMOVEALL_DESTROY, null,
         null/* newValue */, callbackArg, false, getMyId());
+    event.disallowOffHeapValues();
     return new DistributedRemoveAllOperation(event, keys.size(), false);
   }
 
@@ -10283,6 +10807,11 @@ public class LocalRegion extends AbstractRegion
     // will be changed to a PUTALL_UPDATE later on.
     EntryEventImpl event = EntryEventImpl.createPutAllEvent(
         putallOp, this, Operation.PUTALL_CREATE, key, value);
+
+    try {
+	event.setFetchFromHDFS(putallOp.getEvent().isFetchFromHDFS());
+    event.setPutDML(putallOp.getEvent().isPutDML());
+    
     if (tagHolder != null) {
       event.setVersionTag(tagHolder.getVersionTag());
       event.setFromServer(tagHolder.isFromServer());
@@ -10300,6 +10829,9 @@ public class LocalRegion extends AbstractRegion
       tagHolder.setVersionTag(event.getVersionTag());
       tagHolder.isConcurrencyConflict(event.isConcurrencyConflict());
     }
+    } finally {
+      event.release();
+    }
   }
   
   
@@ -10312,6 +10844,7 @@ public class LocalRegion extends AbstractRegion
     checkReadiness();
     validateKey(key);
     EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, this, key);
+    try {
     if (tagHolder != null) {
       event.setVersionTag(tagHolder.getVersionTag());
       event.setFromServer(tagHolder.isFromServer());
@@ -10337,6 +10870,9 @@ public class LocalRegion extends AbstractRegion
       tagHolder.setVersionTag(event.getVersionTag());
       tagHolder.isConcurrencyConflict(event.isConcurrencyConflict());
     }
+    } finally {
+      event.release();
+    }
   }
   public void performPutAllEntry(EntryEventImpl event) {
     getDataView().putEntry(event, false, false, null, false, 0L, false);
@@ -10929,9 +11465,9 @@ public class LocalRegion extends AbstractRegion
   public ResultCollector executeFunction(final DistributedRegionFunctionExecutor execution, final Function function, final Object args,
       final ResultCollector rc,final Set filter, final ServerToClientFunctionResultSender sender) {   
 
-    if (function.optimizeForWrite() && heapThresholdReached.get() &&
-        !InternalResourceManager.isLowMemoryExceptionDisabled()) {
-      Set<DistributedMember> htrm = getHeapThresholdReachedMembers();
+    if (function.optimizeForWrite() && memoryThresholdReached.get() &&
+        !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      Set<DistributedMember> htrm = getMemoryThresholdReachedMembers();
       throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
           new Object[] {function.getId(), htrm}), htrm);
     }
@@ -10952,7 +11488,7 @@ public class LocalRegion extends AbstractRegion
   /**
    * @return the set of members which are known to be critical
    */
-  public Set<DistributedMember> getHeapThresholdReachedMembers() {
+  public Set<DistributedMember> getMemoryThresholdReachedMembers() {
     return Collections.<DistributedMember> singleton(this.cache.getMyId());
   }
 
@@ -10963,18 +11499,21 @@ public class LocalRegion extends AbstractRegion
     if (logger.isDebugEnabled()) {
       logger.debug("Region:{} received a Memory event.{}", this, event);
     }
-    setHeapThresholdFlag(event);
+    setMemoryThresholdFlag(event);
   }
 
-  protected void setHeapThresholdFlag(MemoryEvent event) {
+  protected void setMemoryThresholdFlag(MemoryEvent event) {
     assert getScope().isLocal();
     if (event.isLocal()) {
-      if (event.getType().isCriticalUp()) {
-        //start rejecting operations
-        heapThresholdReached.set(true);
-      } else if (event.getType().isCriticalDown() || event.getType().isCriticalDisabled()) {
-        //stop rejecting operations
-        heapThresholdReached.set(false);
+      if (event.getState().isCritical()
+          && !event.getPreviousState().isCritical()
+          && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
+        // start rejecting operations
+        memoryThresholdReached.set(true);
+      } else if (!event.getState().isCritical()
+          && event.getPreviousState().isCritical()
+          && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
+        memoryThresholdReached.set(false);
       }
     }
   }
@@ -11030,28 +11569,32 @@ public class LocalRegion extends AbstractRegion
 
   /**
    * Initialize the set of remote members whose memory state is critical.  This is
-   * called when registering using {@link InternalResourceManager#addResourceListener(ResourceListener)}.
+   * called when registering using {@link InternalResourceManager#addResourceListener(ResourceType, ResourceListener)}.
    * It should only be called once and very early in this region's lifetime.
    *
-   * @param localHeapIsCritical true if the local heap is in a critical state
-   * @param critialMembers set of members whose heaps are in a critical state
-   * @see ResourceManager#setCriticalHeapPercentage(float)
+   * @param localMemoryIsCritical true if the local memory is in a critical state
+   * @param critialMembers set of members whose memory is in a critical state
+   * @see ResourceManager#setCriticalHeapPercentage(float) and ResourceManager#setCriticalOffHeapPercentage(float)
    * @since 6.0
    */
-  public void initialCriticalMembers(boolean localHeapIsCritical,
+  public void initialCriticalMembers(boolean localMemoryIsCritical,
       Set<InternalDistributedMember> critialMembers) {
     assert getScope().isLocal();
-    if (localHeapIsCritical) {
-      heapThresholdReached.set(true);
+    if (localMemoryIsCritical) {
+      memoryThresholdReached.set(true);
     }
   }
   
   public void destroyRecoveredEntry(Object key) {
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         this,
         Operation.LOCAL_DESTROY, key, null, null, false, getMyId(), false);
+    try {
     event.inhibitCacheListenerNotification(true);
     mapDestroy(event, true, false, null, false, true);
+    } finally {
+      event.release();
+    }
   }
   public boolean lruLimitExceeded() {
     return this.entries.lruLimitExceeded();
@@ -11773,7 +12316,7 @@ public class LocalRegion extends AbstractRegion
      // was modified to call the other EntryEventImpl constructor so that
      // an id will be generated by default. Null was passed in anyway.
      //   generate EventID
-     EntryEventImpl event = new EntryEventImpl(
+     EntryEventImpl event = EntryEventImpl.create(
          this, Operation.PUT_IF_ABSENT, key,
          value, callbackArgument, false, getMyId());
      final Object oldValue = null;
@@ -11799,6 +12342,8 @@ public class LocalRegion extends AbstractRegion
        }
      } catch (EntryNotFoundException e) {
        return event.getOldValue();
+     } finally {
+       event.release();
      }
    }
           
@@ -11832,7 +12377,7 @@ public class LocalRegion extends AbstractRegion
     if (value == null) {
       value = Token.INVALID;
     }
-    EntryEventImpl event = new EntryEventImpl(this,
+    EntryEventImpl event = EntryEventImpl.create(this,
                                               Operation.REMOVE,
                                               key,
                                               null, // newValue
@@ -11860,6 +12405,8 @@ public class LocalRegion extends AbstractRegion
       } else {
         throw rde;
       }
+    } finally {
+      event.release();
     }
     return true;
   }
@@ -11887,7 +12434,7 @@ public class LocalRegion extends AbstractRegion
     validateArguments(key, newValue, callbackArg);
     checkReadiness();
     checkForLimitedOrNoAccess();
-    EntryEventImpl event = new EntryEventImpl(this,
+    EntryEventImpl event = EntryEventImpl.create(this,
                                               Operation.REPLACE,
                                               key,
                                               newValue,
@@ -11927,6 +12474,8 @@ public class LocalRegion extends AbstractRegion
       }
     } catch (EntryNotFoundException e) {  // put failed on server
       return false;
+    } finally {
+      event.release();
     }
   }
   
@@ -11953,7 +12502,7 @@ public class LocalRegion extends AbstractRegion
     validateArguments(key, value, callbackArg);
     checkReadiness();
     checkForLimitedOrNoAccess();


<TRUNCATED>

Mime
View raw message