geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [20/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:02:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 929f476..bc164a8 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -85,12 +85,23 @@ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
 import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
 import com.gemstone.gemfire.cache.TransactionException;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
 import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionContext;
 import com.gemstone.gemfire.cache.execute.FunctionException;
 import com.gemstone.gemfire.cache.execute.FunctionService;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
@@ -149,8 +160,11 @@ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile
 import com.gemstone.gemfire.internal.cache.DestroyPartitionedRegionMessage.DestroyPartitionedRegionResponse;
 import com.gemstone.gemfire.internal.cache.DistributedRegion.DiskPage;
 import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
 import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
 import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
 import com.gemstone.gemfire.internal.cache.execute.FunctionExecutionNodePruner;
 import com.gemstone.gemfire.internal.cache.execute.FunctionRemoteContext;
@@ -169,6 +183,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage.DestroyResponse;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyRegionOnDataStoreMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.DumpAllPRConfigMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion;
 import com.gemstone.gemfire.internal.cache.partitioned.DumpB2NRegion.DumpB2NResponse;
@@ -197,6 +212,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator;
 import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
 import com.gemstone.gemfire.internal.cache.partitioned.PRSanityCheckMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PRUpdateEntryVersionMessage.UpdateEntryVersionResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage.PartitionResponse;
@@ -234,6 +250,8 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 import com.gemstone.gemfire.internal.util.TransformUtils;
 import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
@@ -691,9 +709,17 @@ public class PartitionedRegion extends LocalRegion implements
   private final PartitionListener[] partitionListeners;
 
   private boolean isShadowPR = false;
+  private boolean isShadowPRForHDFS = false;
   
   private AbstractGatewaySender parallelGatewaySender = null;
   
+  private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return false;
+    }
+  };
+  
   public PartitionedRegion(String regionname, RegionAttributes ra,
       LocalRegion parentRegion, GemFireCacheImpl cache,
       InternalRegionArguments internalRegionArgs) {
@@ -712,6 +738,12 @@ public class PartitionedRegion extends LocalRegion implements
     // distributed system disconnect even this (or other) PRs are destroyed
     // (which prevents pridmap cleanup).
     cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
+    
+    // add an async queue for the region if the store name is not null. 
+    if (this.getHDFSStoreName() != null) {
+      String eventQueueName = getHDFSEventQueueName();
+      super.addAsyncEventQueueId(eventQueueName);
+    }
 
     // this.userScope = ra.getScope();
     this.partitionAttributes = ra.getPartitionAttributes();
@@ -790,6 +822,8 @@ public class PartitionedRegion extends LocalRegion implements
     if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
       this.isShadowPR = true;
       this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
+      if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
+        this.isShadowPRForHDFS = true;
     }
     
     
@@ -832,10 +866,38 @@ public class PartitionedRegion extends LocalRegion implements
       }      
     });
   }
-  
-  public boolean isShadowPR() {
+
+  @Override
+  public final boolean isHDFSRegion() {
+    return this.getHDFSStoreName() != null;
+  }
+
+  @Override
+  public final boolean isHDFSReadWriteRegion() {
+    return isHDFSRegion() && !getHDFSWriteOnly();
+  }
+
+  @Override
+  protected final boolean isHDFSWriteOnly() {
+    return isHDFSRegion() && getHDFSWriteOnly();
+  }
+
+  public final void setQueryHDFS(boolean includeHDFS) {
+    queryHDFS.set(includeHDFS);
+  }
+
+  @Override
+  public final boolean includeHDFSResults() {
+    return queryHDFS.get();
+  }
+
+  public final boolean isShadowPR() {
     return isShadowPR;
   }
+
+  public final boolean isShadowPRForHDFS() {
+    return isShadowPRForHDFS;
+  }
   
   public AbstractGatewaySender getParallelGatewaySender() {
     return parallelGatewaySender;
@@ -978,7 +1040,7 @@ public class PartitionedRegion extends LocalRegion implements
 
     if (!this.isDestroyed && !this.isLocallyDestroyed) {
       // Register at this point so that other members are known
-      this.cache.getResourceManager().addResourceListener(this);
+      this.cache.getResourceManager().addResourceListener(ResourceType.MEMORY, this);
     }
     
     // Create OQL indexes before starting GII.
@@ -1602,7 +1664,7 @@ public class PartitionedRegion extends LocalRegion implements
       try {
         final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
         if (loc) {
-          ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones);
+          ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
         } else {
           ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
           // TODO:Suranjan&Yogesh : there should be better way than this one
@@ -1924,7 +1986,7 @@ public class PartitionedRegion extends LocalRegion implements
       this.getCancelCriterion().checkCancelInProgress(null);
       boolean interrupted = Thread.interrupted();
       try {
-        prqe.queryBuckets(null);
+        results = prqe.queryBuckets(null);
         break;
       }
       catch (InterruptedException e) {
@@ -1968,7 +2030,7 @@ public class PartitionedRegion extends LocalRegion implements
       ObjectType elementType = results.getCollectionType().getElementType();
       if (selectExpr.getOrderByAttrs() != null) {
         // Set limit also, its not applied while building the final result set as order by is involved.
-        results = new ResultsCollectionWrapper(elementType, results.asSet(), query.getLimit(parameters));
+       // results = new ResultsCollectionWrapper(elementType, results.asSet(), query.getLimit(parameters));
       } else if (allowsDuplicates) {
         results = new ResultsCollectionWrapper(elementType, results.asSet());
       }
@@ -2061,7 +2123,8 @@ public class PartitionedRegion extends LocalRegion implements
           bucketStorageAssigned=false;
           // if this is a Delta update, then throw exception since the key doesn't
           // exist if there is no bucket for it yet
-          if (event.hasDelta()) {
+          // For HDFS region, we will recover key, so allow bucket creation
+          if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
             throw new EntryNotFoundException(LocalizedStrings.
               PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
                 .toLocalizedString());
@@ -2239,6 +2302,7 @@ public class PartitionedRegion extends LocalRegion implements
       throw new CacheClosedException("Cache is shutting down");
     }
 
+    try {
     final long startTime = PartitionedRegionStats.startTime();
     // build all the msgs by bucketid
     HashMap prMsgMap = putallO.createPRMessages();
@@ -2279,7 +2343,11 @@ public class PartitionedRegion extends LocalRegion implements
           logger.debug("PR.postPutAll encountered exception at sendMsgByBucket, ",ex);
         }
         EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
+        try {
           partialKeys.saveFailedKey(firstEvent.getKey(), ex);
+        } finally {
+          firstEvent.release();
+        }
       }
       if (isDebugEnabled) {
         long now = System.currentTimeMillis();
@@ -2313,7 +2381,21 @@ public class PartitionedRegion extends LocalRegion implements
         }
       }
     } 
-  } 
+    } finally {
+      /*
+// TODO XD OFFHEAP MERGE: do we have any events that need freeOffHeapReferences
+      for (PutAllPRMessage.PutAllResponse resp : responses) {
+        PutAllPRMessage.PRMsgResponseContext ctx = resp.getContextObject();
+        if (ctx != null) {
+          EntryEventImpl e = ctx.getEvent();
+          if (e != null) {
+            e.release();
+          }
+        }
+      }
+      */
+    }
+  }
   @Override
   public void postRemoveAllSend(DistributedRemoveAllOperation op, VersionedObjectList successfulOps) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -2362,7 +2444,11 @@ public class PartitionedRegion extends LocalRegion implements
           logger.debug("PR.postRemoveAll encountered exception at sendMsgByBucket, ",ex);
         }
         EntryEventImpl firstEvent = prMsg.getFirstEvent(this);
+        try {
           partialKeys.saveFailedKey(firstEvent.getKey(), ex);
+        } finally {
+          firstEvent.release();
+        }
       }
       if (isDebugEnabled) {
         long now = System.currentTimeMillis();
@@ -2405,6 +2491,7 @@ public class PartitionedRegion extends LocalRegion implements
     
     // retry the put remotely until it finds the right node managing the bucket
     EntryEventImpl event = prMsg.getFirstEvent(this);
+    try {
     RetryTimeKeeper retryTime = null;
     InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId.intValue(), null);
     if (isDebugEnabled) {
@@ -2528,6 +2615,11 @@ public class PartitionedRegion extends LocalRegion implements
       }
       this.prStats.incPutAllRetries();
     } // for
+    } finally {
+      if (event != null) {
+        event.release();
+      }
+    }
     // NOTREACHED
   }
 
@@ -2861,13 +2953,6 @@ public class PartitionedRegion extends LocalRegion implements
         }
         checkIfAboveThreshold(event);
         if (isLocal) {
-          // Local updates should insert a serialized (aka CacheDeserializable) object
-          // given that most manipulation of values is remote (requiring serialization to send).
-          // But... function execution always implies local manipulation of
-          // values so keeping locally updated values in Object form should be more efficient.
-          if (! DistributionManager.isFunctionExecutionThread.get().booleanValue()) {
-            event.makeSerializedNewValue();
-          }
 //          final boolean cacheWrite = !event.isOriginRemote()
 //              && !event.isNetSearch();
 //          if (cacheWrite) {
@@ -2875,9 +2960,18 @@ public class PartitionedRegion extends LocalRegion implements
 //          }
           event.setInvokePRCallbacks(true);
           long start = this.prStats.startPutLocal();
-          try {          
+          try {   
+            final BucketRegion br = this.dataStore.getInitializedBucketForId(event.getKey(), bucketId);
+            // Local updates should insert a serialized (aka CacheDeserializable) object
+            // given that most manipulation of values is remote (requiring serialization to send).
+            // But... function execution always implies local manipulation of
+            // values so keeping locally updated values in Object form should be more efficient.
+            if (! DistributionManager.isFunctionExecutionThread.get().booleanValue()) {
+              // TODO: this condition may not help since BucketRegion.virtualPut calls forceSerialized
+              br.forceSerialized(event);
+            }
           if (ifNew) {
-            result = this.dataStore.createLocally(bucketId,
+            result = this.dataStore.createLocally(br,
                                                   event,
                                                   ifNew,
                                                   ifOld,
@@ -2885,7 +2979,7 @@ public class PartitionedRegion extends LocalRegion implements
                                                   lastModified);
           }
           else {
-            result = this.dataStore.putLocally(bucketId,
+            result = this.dataStore.putLocally(br,
                                                event,
                                                ifNew, 
                                                ifOld,
@@ -3083,10 +3177,9 @@ public class PartitionedRegion extends LocalRegion implements
    * @return the size of the serialized entry
    */
   private static int getEntrySize(EntryEventImpl eei) {
-    final CachedDeserializable vcd = (CachedDeserializable)eei
-        .getSerializedNewValue();
-    if (vcd != null) {
-      return vcd.getSizeInBytes();
+    @Unretained final Object v = eei.getRawNewValue();
+    if (v instanceof CachedDeserializable) {
+      return ((CachedDeserializable)v).getSizeInBytes();
     }
     return 0;
   }
@@ -3222,7 +3315,7 @@ public class PartitionedRegion extends LocalRegion implements
     */
    @Override Object nonTxnFindObject(KeyInfo keyInfo, boolean isCreate,
       boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
-      EntryEventImpl clientEvent, boolean returnTombstones) 
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
       throws TimeoutException, CacheLoaderException
   {
     Object result = null;
@@ -3241,9 +3334,18 @@ public class PartitionedRegion extends LocalRegion implements
             } else {
               result = cd.getDeserializedForReading();
             }
+            
           } else if (!disableCopyOnRead) {
             result = conditionalCopy(result);
           }
+          
+        //For sqlf since the deserialized value is nothing but chunk
+          // before returning the found value increase its use count
+         /* if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) {
+            if(!((Chunk)result).use()) {
+              return null;
+            }
+          }*/
            // what was a miss is now a hit
           RegionEntry re = null;
           if (isCreate) {
@@ -3268,10 +3370,14 @@ public class PartitionedRegion extends LocalRegion implements
     }
     try {
       result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
-          localValue, disableCopyOnRead, preferCD, null, null, false);
+          localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
     }
     finally {
-      thisFuture.set(result);
+      if (result instanceof Chunk) {
+        thisFuture.set(null);
+      } else {
+        thisFuture.set(result);
+      }
       this.getFutures.remove(keyInfo.getKey());
     }
     return result;
@@ -3283,7 +3389,7 @@ public class PartitionedRegion extends LocalRegion implements
   public Object get(Object key, Object aCallbackArgument,
       boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
       ClientProxyMembershipID requestingClient,
-      EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException
   {
     validateKey(key);
     validateCallbackArg(aCallbackArgument);
@@ -3297,7 +3403,7 @@ public class PartitionedRegion extends LocalRegion implements
       // if scope is local and there is no loader, then
       // don't go further to try and get value
       Object value = getDataView().findObject(getKeyInfo(key, aCallbackArgument), this, true/*isCreate*/, generateCallbacks,
-                                      null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+                                      null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
       if (value != null && !Token.isInvalid(value)) {
         miss = false;
       }
@@ -3343,7 +3449,7 @@ public class PartitionedRegion extends LocalRegion implements
     if (primary == null) {
       return null;
     }
-    if (isTX()) {
+    if (isTX() || this.hdfsStoreName != null) {
       return getNodeForBucketWrite(bucketId, null);
     }
     InternalDistributedMember result =  getRegionAdvisor().getPreferredNode(bucketId);
@@ -3357,7 +3463,7 @@ public class PartitionedRegion extends LocalRegion implements
    */
   private InternalDistributedMember getNodeForBucketReadOrLoad(int bucketId) {
     InternalDistributedMember targetNode;
-    if (!this.haveCacheLoader) {
+    if (!this.haveCacheLoader && (this.hdfsStoreName == null)) {
       targetNode = getNodeForBucketRead(bucketId);
     }
     else {
@@ -3492,7 +3598,7 @@ public class PartitionedRegion extends LocalRegion implements
   @Override
   protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
       TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
-      EntryEventImpl clientEvent, boolean returnTombstones)
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     Object obj = null;
@@ -3506,7 +3612,21 @@ public class PartitionedRegion extends LocalRegion implements
             isCreate ? Operation.CREATE : null, key, null, aCallbackArgument);
         keyInfo.setBucketId(bucketId);
       }
-      InternalDistributedMember targetNode = getNodeForBucketReadOrLoad(bucketId);
+      InternalDistributedMember targetNode = null;
+      TXStateProxy txState = getTXState();
+      boolean allowRetry;
+      if (txState != null) {
+        if (txState.isRealDealLocal()) {
+          targetNode = getMyId();
+        } else {
+          targetNode = (InternalDistributedMember) txState.getTarget();
+          assert targetNode != null;
+        }
+        allowRetry = false;
+      } else {
+        targetNode = getNodeForBucketReadOrLoad(bucketId);
+        allowRetry = true;
+      }
       if (targetNode == null) {
         if (logger.isDebugEnabled()) {
           logger.debug("No need to create buckets on get(), no CacheLoader configured.");
@@ -3514,7 +3634,7 @@ public class PartitionedRegion extends LocalRegion implements
         return null;
       }
       
-      obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+      obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry, allowReadFromHDFS);
     }
     finally {
       this.prStats.endGet(startTime);
@@ -3762,8 +3882,9 @@ public class PartitionedRegion extends LocalRegion implements
     InternalDistributedMember targetNode = null;
     if (function.optimizeForWrite()) {
       targetNode = createBucket(bucketId.intValue(), 0, null /* retryTimeKeeper */);
-      if (cache.getResourceManager().isMemberHeapCritical(targetNode)
-          && !InternalResourceManager.isLowMemoryExceptionDisabled()) {
+      HeapMemoryMonitor hmm = ((InternalResourceManager) cache.getResourceManager()).getHeapMonitor();
+      if (hmm.isMemberHeapCritical(targetNode)
+          && !MemoryThresholds.isLowMemoryExceptionDisabled()) {
         Set<DistributedMember> sm = Collections.singleton((DistributedMember) targetNode);
         throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
                 new Object[] {function.getId(), sm}), sm);
@@ -3942,10 +4063,10 @@ public class PartitionedRegion extends LocalRegion implements
     }
     
     Set<InternalDistributedMember> dest = memberToBuckets.keySet();
-    if (function.optimizeForWrite() && cache.getResourceManager().
+    if (function.optimizeForWrite() && cache.getResourceManager().getHeapMonitor().
         containsHeapCriticalMembers(dest) &&
-        !InternalResourceManager.isLowMemoryExceptionDisabled()) {
-      Set<InternalDistributedMember> hcm  = cache.getResourceManager().getHeapCriticalMembers();
+        !MemoryThresholds.isLowMemoryExceptionDisabled()) {
+      Set<InternalDistributedMember> hcm  = cache.getResourceAdvisor().adviseCritialMembers();
       Set<DistributedMember> sm = SetUtils.intersection(hcm, dest);
       throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_FOR_0_FUNCEXEC_MEMBERS_1.toLocalizedString(
           new Object[] {function.getId(), sm}), sm);
@@ -4100,10 +4221,11 @@ public class PartitionedRegion extends LocalRegion implements
    * @param requestingClient the client requesting the object, or null if not from a client
    * @param clientEvent TODO
    * @param returnTombstones TODO
+   * @param allowRetry if false then do not retry
    */
   private Object getFromBucket(final InternalDistributedMember targetNode,
       int bucketId, final Object key, final Object aCallbackArgument,
-      boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) {
+      boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowRetry, boolean allowReadFromHDFS) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     final int retryAttempts = calcRetry();
@@ -4133,7 +4255,7 @@ public class PartitionedRegion extends LocalRegion implements
       try {
         if (isLocal) {
           obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
-              disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+              disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS);
         }
         else {
             if (localCacheEnabled && null != (obj = localCacheGet(key))) { // OFFHEAP: copy into heap cd; TODO optimize for preferCD case
@@ -4142,17 +4264,22 @@ public class PartitionedRegion extends LocalRegion implements
               }
               return obj;
             }
-            else if (this.haveCacheLoader) {
+            else if (this.haveCacheLoader || this.hdfsStoreName != null) {
               // If the region has a cache loader, 
               // the target node is the primary server of the bucket. But, if the 
               // value can be found in a local bucket, we should first try there. 
-              if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument,
-                  disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones))) {
+
+              /* MergeGemXDHDFSToGFE -readoing from local bucket was disabled in GemXD*/
+			  if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument,
+                  disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS))) {
                 return obj;
               } 
             }
           
-          obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones);
+          //  Test hook
+          if (((LocalRegion)this).isTest())
+            ((LocalRegion)this).incCountNotFoundInLocal();
+          obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
  
           // TODO:Suranjan&Yogesh : there should be better way than this one
           String name = Thread.currentThread().getName();
@@ -4168,37 +4295,55 @@ public class PartitionedRegion extends LocalRegion implements
           logger.debug("getFromBucket Encountered PRLocallyDestroyedException", pde);
         }
         checkReadiness();
-        retryNode = getNodeForBucketReadOrLoad(bucketId);
+        if (allowRetry) {
+          retryNode = getNodeForBucketReadOrLoad(bucketId);
+        } else {
+          return null;
+        }
       }
       catch (ForceReattemptException prce) {
         prce.checkKey(key);
-        if (isDebugEnabled) {
-          logger.debug("getFromBucket: retry attempt: {} of {}", count, retryAttempts, prce);
-        }
         checkReadiness();
 
-        InternalDistributedMember lastNode = retryNode;
-        retryNode = getNodeForBucketReadOrLoad(bucketId);
-        if (lastNode.equals(retryNode)) {
-          if (retryTime == null) {
-            retryTime = new RetryTimeKeeper(this.retryTimeout);
+        if (allowRetry) {
+          InternalDistributedMember lastNode = retryNode;
+          if (isDebugEnabled) {
+            logger.debug("getFromBucket: retry attempt: {} of {}", count, retryAttempts, prce);
           }
-          if (retryTime.overMaximum()) {
-            break;
+          retryNode = getNodeForBucketReadOrLoad(bucketId);
+          if (lastNode.equals(retryNode)) {
+            if (retryTime == null) {
+              retryTime = new RetryTimeKeeper(this.retryTimeout);
+            }
+            if (retryTime.overMaximum()) {
+              break;
+            }
+            if (isDebugEnabled) {
+              logger.debug("waiting to retry node {}", retryNode);
+            }
+            retryTime.waitToRetryNode();
           }
-          if (isDebugEnabled) {
-            logger.debug("waiting to retry node {}", retryNode);
+        } else {
+          Throwable cause = prce.getCause();
+          if (cause instanceof PrimaryBucketException) {
+            throw (PrimaryBucketException)cause;
+          } else if (cause instanceof TransactionDataRebalancedException) {
+            throw (TransactionDataRebalancedException)cause;
+          } else {
+            return null;
           }
-          retryTime.waitToRetryNode();
         }
       }
       catch (PrimaryBucketException notPrimary) {
-        if (isDebugEnabled) {
-          logger.debug("getFromBucket: {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode);
+        if (allowRetry) {
+          if (isDebugEnabled) {
+            logger.debug("getFromBucket: {} on Node {} not primary", notPrimary.getLocalizedMessage(), retryNode);
+          }
+          getRegionAdvisor().notPrimary(bucketId, retryNode);
+          retryNode = getNodeForBucketReadOrLoad(bucketId);
+        } else {
+          throw notPrimary;
         }
-        getRegionAdvisor().notPrimary(bucketId, retryNode);
-        retryNode = getNodeForBucketReadOrLoad(bucketId);
-
       }
 
       // It's possible this is a GemFire thread e.g. ServerConnection
@@ -4231,10 +4376,10 @@ public class PartitionedRegion extends LocalRegion implements
    * from it
    *   
    */
-  private Object getFromLocalBucket(int bucketId, final Object key,
+  public Object getFromLocalBucket(int bucketId, final Object key,
 		final Object aCallbackArgument, boolean disableCopyOnRead,
 		boolean preferCD, ClientProxyMembershipID requestingClient,
-		EntryEventImpl clientEvent, boolean returnTombstones)
+		EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
 		throws ForceReattemptException, PRLocallyDestroyedException {
     Object obj;
     // try reading locally. 
@@ -4243,7 +4388,7 @@ public class PartitionedRegion extends LocalRegion implements
       return null; // fixes 51657
     }
     if (readNode.equals(getMyId()) && null != ( obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
-      disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true))) {
+      disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true, allowReadFromHDFS))) {
 	  if (logger.isTraceEnabled()) {
             logger.trace("getFromBucket: Getting key {} ({}) locally - success", key, key.hashCode());
 	  }
@@ -5039,7 +5184,7 @@ public class PartitionedRegion extends LocalRegion implements
    *                 if the peer is no longer available
    */
   public Object getRemotely(InternalDistributedMember targetNode,
-      int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws PrimaryBucketException,
+      int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException {
     Object value;
     if (logger.isDebugEnabled()) {
@@ -5047,7 +5192,7 @@ public class PartitionedRegion extends LocalRegion implements
           getPRId(), BUCKET_ID_SEPARATOR, bucketId, key);
     }
     GetResponse response = GetMessage.send(targetNode, this, key,
-        aCallbackArgument, requestingClient, returnTombstones);
+        aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
     this.prStats.incPartitionMessagesSent();
     value = response.waitForResponse(preferCD);
     if (clientEvent != null) {
@@ -5271,6 +5416,8 @@ public class PartitionedRegion extends LocalRegion implements
     }
     
     fillInProfile((PartitionProfile) profile);
+    
+    profile.isOffHeap = getOffHeap();
   }
 
   /** set fields that are only in PartitionProfile... */
@@ -5582,8 +5729,7 @@ public class PartitionedRegion extends LocalRegion implements
 
         // No storage found for bucket, early out preventing hot loop, bug 36819
         if (currentTarget == null) {
-          checkShutdown(); // prefer the "closed" exceptions over entry not found
-          throw new EntryNotFoundException(LocalizedStrings.PartitionedRegion_ENTRY_NOT_FOUND_FOR_KEY_0.toLocalizedString(event.getKey()));
+          checkEntryNotFound(event.getKey());
         }
         continue;
       } // pick target
@@ -6118,8 +6264,7 @@ public class PartitionedRegion extends LocalRegion implements
         }
 
         if (retryNode == null) {
-          checkShutdown(); // Prefer the closed exceptions over entry not found
-          throw new EntryNotFoundException(LocalizedStrings.PartitionedRegion_ENTRY_NOT_FOUND_FOR_KEY_0.toLocalizedString(event.getKey()));
+          checkEntryNotFound(event.getKey());
         }
         continue;
       }
@@ -7003,10 +7148,18 @@ public class PartitionedRegion extends LocalRegion implements
   }
 
   public int entryCount(Set<Integer> buckets) {
+    return entryCount(buckets, false);
+  }
+  
+  public int entryCount(Set<Integer> buckets,
+      boolean estimate) {
     Map<Integer, SizeEntry> bucketSizes = null;
+ 	if (isHDFSReadWriteRegion() && (includeHDFSResults() || estimate)) {
+      bucketSizes = getSizeForHDFS( buckets, estimate);
+	} else {
     if (buckets != null) {
       if (this.dataStore != null) {
-        List<Integer> list = new ArrayList<Integer>();
+        List<Integer> list = new ArrayList<Integer>();	
         list.addAll(buckets);
         bucketSizes = this.dataStore.getSizeLocallyForBuckets(list);
       }
@@ -7018,7 +7171,7 @@ public class PartitionedRegion extends LocalRegion implements
       HashSet recips = (HashSet)getRegionAdvisor().adviseDataStore(true);
       recips.remove(getMyId());
       if (!recips.isEmpty()) {
-        Map<Integer, SizeEntry> remoteSizes = getSizeRemotely(recips);
+        Map<Integer, SizeEntry> remoteSizes = getSizeRemotely(recips, false);
         if (logger.isDebugEnabled()) {
           logger.debug("entryCount: {} remoteSizes={}", this, remoteSizes);
         }
@@ -7035,6 +7188,7 @@ public class PartitionedRegion extends LocalRegion implements
         }
       }
     }
+ 	}
 
     int size = 0;
     if (bucketSizes != null) {
@@ -7043,6 +7197,93 @@ public class PartitionedRegion extends LocalRegion implements
       }
     }
     return size;
+ 	
+  
+  }
+
+  @Override
+  public long getEstimatedLocalSize() {
+    final PartitionedRegionDataStore ds = this.dataStore;
+    if (ds != null) {
+      return ds.getEstimatedLocalBucketSize(false);
+    }
+    else {
+      return 0;
+    }
+  }
+  private Map<Integer, SizeEntry> getSizeForHDFS(final Set<Integer> buckets, boolean estimate) {
+    // figure out which buckets to include
+    Map<Integer, SizeEntry> bucketSizes = new HashMap<Integer, SizeEntry>();
+    getRegionAdvisor().accept(new BucketVisitor<Map<Integer, SizeEntry>>() {
+      @Override
+      public boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr,
+          Map<Integer, SizeEntry> map) {
+        if (buckets == null || buckets.contains(pbr.getBucketId())) {
+          map.put(pbr.getBucketId(), null);
+          // ensure that the bucket has been created
+          pbr.getPartitionedRegion().getOrCreateNodeForBucketWrite(pbr.getBucketId(), null);
+        }
+        return true;
+      }
+    }, bucketSizes);
+
+    RetryTimeKeeper retry = new RetryTimeKeeper(retryTimeout);
+
+    while (true) {
+      // get the size from local buckets
+      if (dataStore != null) {
+        Map<Integer, SizeEntry> localSizes;
+        if (estimate) {
+          localSizes = dataStore.getSizeEstimateForLocalPrimaryBuckets();
+        } else {
+          localSizes = dataStore.getSizeForLocalPrimaryBuckets();
+        }
+        for (Map.Entry<Integer, SizeEntry> me : localSizes.entrySet()) {
+          if (bucketSizes.containsKey(me.getKey())) {
+            bucketSizes.put(me.getKey(), me.getValue());
+          }
+        }
+      }
+      // all done
+      int count = 0;
+      Iterator it = bucketSizes.values().iterator();
+      while (it.hasNext()) {
+        if (it.next() != null) count++;
+      }
+      if (bucketSizes.size() == count) {
+        return bucketSizes;
+      }
+      
+      Set<InternalDistributedMember> remotes = getRegionAdvisor().adviseDataStore(true);
+      remotes.remove(getMyId());
+      
+      // collect remote sizes
+      if (!remotes.isEmpty()) {
+        Map<Integer, SizeEntry> remoteSizes = new HashMap<Integer, PartitionedRegion.SizeEntry>();
+        try {
+          remoteSizes = getSizeRemotely(remotes, estimate);
+        } catch (ReplyException e) {
+          // Remote member will never throw ForceReattemptException or
+          // PrimaryBucketException, so any exception on the remote member
+          // should be re-thrown
+          e.handleAsUnexpected();
+        }
+        for (Map.Entry<Integer, SizeEntry> me : remoteSizes.entrySet()) {
+          Integer k = me.getKey();
+          if (bucketSizes.containsKey(k) && me.getValue().isPrimary()) {
+            bucketSizes.put(k, me.getValue());
+          }
+        }
+      }
+      
+      if (retry.overMaximum()) {
+        checkReadiness();
+        PRHARedundancyProvider.timedOut(this, null, null, "calculate size", retry.getRetryTime());
+      }
+      
+      // throttle subsequent attempts
+      retry.waitForBucketsRecovery();
+    }
   }
   
   /**
@@ -7055,8 +7296,8 @@ public class PartitionedRegion extends LocalRegion implements
    * @param targetNodes
    * @return the size of all the buckets hosted on the target node.
    */
-  private Map<Integer, SizeEntry> getSizeRemotely(Set targetNodes) {
-    SizeResponse r = SizeMessage.send(targetNodes, this, null);
+  private Map<Integer, SizeEntry> getSizeRemotely(Set targetNodes, boolean estimate) {
+    SizeResponse r = SizeMessage.send(targetNodes, this, null,estimate);
     this.prStats.incPartitionMessagesSent();
     Map retVal = null;
     try {
@@ -7443,6 +7684,9 @@ public class PartitionedRegion extends LocalRegion implements
       .append("; isClosed=").append(this.isClosed)
       .append("; retryTimeout=").append(this.retryTimeout)
       .append("; serialNumber=").append(getSerialNumber())
+	  .append("; hdfsStoreName=").append(getHDFSStoreName())
+      .append("; hdfsWriteOnly=").append(getHDFSWriteOnly())
+      
       .append("; partition attributes=").append(getPartitionAttributes().toString())
       .append("; on VM ").append(getMyId())
       .append("]")
@@ -7585,6 +7829,17 @@ public class PartitionedRegion extends LocalRegion implements
   @Override
   public void destroyRegion(Object aCallbackArgument)
       throws CacheWriterException, TimeoutException {
+    //For HDFS regions, we need a data store
+    //to do the global destroy so that it can delete
+    //the data from HDFS as well.
+    if(!isDataStore() && this.dataPolicy.withHDFS()) {
+      if(destroyOnDataStore(aCallbackArgument)) {
+        //If we were able to find a data store to do the destroy,
+        //stop here.
+        //otherwise go ahead and destroy the region from this member
+        return;
+      }
+    }
 
     checkForColocatedChildren();
     getDataView().checkSupportsRegionDestroy();
@@ -7595,6 +7850,35 @@ public class PartitionedRegion extends LocalRegion implements
     basicDestroyRegion(event, true);
   }
 
+  /**Globally destroy the partitioned region by sending a message
+   * to a data store to do the destroy.
+   * @return true if the region was destroyed successfully
+   */
+  private boolean destroyOnDataStore(Object aCallbackArgument) {
+    RegionAdvisor advisor = getRegionAdvisor();
+    Set<InternalDistributedMember> attempted = new HashSet<InternalDistributedMember>();
+    
+    checkReadiness();
+    while(!isDestroyed()) {
+      Set<InternalDistributedMember> available = advisor.adviseInitializedDataStore();
+      available.removeAll(attempted);
+      if(available.isEmpty()) {
+        return false;
+      }
+      InternalDistributedMember next = available.iterator().next();
+      try {
+        DestroyRegionOnDataStoreMessage.send(next, this, aCallbackArgument);
+        return true;
+      } catch(ReplyException e) {
+        //try the next member
+        if(logger.isTraceEnabled()) {
+          logger.trace("Error destroying " + this + " on " + next, e);
+        }
+      }
+    }
+    
+    return true;
+  }
   public void destroyParallelGatewaySenderRegion(Operation op, boolean cacheWrite,
       boolean lock, boolean callbackEvents) {
 
@@ -7603,7 +7887,9 @@ public class PartitionedRegion extends LocalRegion implements
     }
 
     boolean keepWaiting = true;
-    while(true){
+
+    AsyncEventQueueImpl hdfsQueue = getHDFSEventQueue();
+    while(true) {
       List<String> pausedSenders = new ArrayList<String>();
       List<ConcurrentParallelGatewaySenderQueue> parallelQueues = new ArrayList<ConcurrentParallelGatewaySenderQueue>();
       isDestroyedForParallelWAN = true;
@@ -7719,7 +8005,12 @@ public class PartitionedRegion extends LocalRegion implements
           keepWaiting = false;
         }
       }
-    } 
+    }
+    
+    if(hdfsQueue != null) {
+      hdfsQueue.destroy();
+      cache.removeAsyncEventQueue(hdfsQueue);
+    }
   }
         
   @Override
@@ -7900,6 +8191,9 @@ public class PartitionedRegion extends LocalRegion implements
     final boolean isClose = event.getOperation().isClose();
     destroyPartitionedRegionLocally(!isClose);
     destroyCleanUp(event, serials);
+	if(!isClose) {
+      destroyHDFSData();
+    }
     return true;
   }
 
@@ -8192,6 +8486,8 @@ public class PartitionedRegion extends LocalRegion implements
       }
     }
     
+    HDFSRegionDirector.getInstance().clear(getFullPath());
+    
     RegionLogger.logDestroy(getName(), cache.getMyId(), null, op.isClose());
   }
 
@@ -9800,10 +10096,13 @@ public class PartitionedRegion extends LocalRegion implements
           keyInfo.getKey(), keyInfo.getValue(), keyInfo.getCallbackArg());
       keyInfo.setBucketId(bucketId);
     }
-    DistributedMember primary = getRegionAdvisor().getPrimaryMemberForBucket(
-        bucketId);
-    if (!primary.equals(getMyId())) {
-      throw new PrimaryBucketException("Bucket "+bucketId+" is not primary. Current primary holder is "+primary);
+    if (keyInfo.isCheckPrimary()) {
+      DistributedMember primary = getRegionAdvisor().getPrimaryMemberForBucket(
+          bucketId);
+      if (!primary.equals(getMyId())) {
+        throw new PrimaryBucketException("Bucket " + bucketId
+            + " is not primary. Current primary holder is " + primary);
+      }
     }
     BucketRegion br = this.dataStore.getLocalBucketById(bucketId);
     RegionEntry re = br.basicGetEntry(keyInfo.getKey());
@@ -10057,17 +10356,21 @@ public class PartitionedRegion extends LocalRegion implements
   }
 
   @Override
-  protected void setHeapThresholdFlag(MemoryEvent event) {
-    if (event.getType().isCriticalUp()) {
+  protected void setMemoryThresholdFlag(MemoryEvent event) {
+    if (event.getState().isCritical()
+        && !event.getPreviousState().isCritical()
+        && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
       // update proxy bucket, so that we can reject operations on those buckets.
       getRegionAdvisor().markBucketsOnMember(event.getMember(), true/*sick*/);
-    } else if (event.getType().isCriticalDown() || event.getType().isCriticalDisabled()) {
+    } else if (!event.getState().isCritical()
+        && event.getPreviousState().isCritical()
+        && (event.getType() == ResourceType.HEAP_MEMORY || (event.getType() == ResourceType.OFFHEAP_MEMORY && getOffHeap()))) {
       getRegionAdvisor().markBucketsOnMember(event.getMember(), false/*not sick*/);
     }
   }
 
   @Override
-  public void initialCriticalMembers(boolean localHeapIsCritical,
+  public void initialCriticalMembers(boolean localMemoryIsCritical,
       Set<InternalDistributedMember> critialMembers) {
     for (InternalDistributedMember idm: critialMembers) {
       getRegionAdvisor().markBucketsOnMember(idm, true/*sick*/);
@@ -10168,12 +10471,16 @@ public class PartitionedRegion extends LocalRegion implements
         keyInfo.setBucketId(bucketId);
       }
       br = ds.getInitializedBucketWithKnownPrimaryForId(null, bucketId);
-      try {
-        br.checkForPrimary();
-      } catch(PrimaryBucketException pbe) {
-         RuntimeException re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
-         re.initCause(pbe);
-         throw re;
+      if (keyInfo.isCheckPrimary()) {
+        try {
+          br.checkForPrimary();
+        } catch (PrimaryBucketException pbe) {
+          RuntimeException re = new TransactionDataRebalancedException(
+              LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
+                  .toLocalizedString());
+          re.initCause(pbe);
+          throw re;
+        }
       }
     } catch(RegionDestroyedException rde) {
       RuntimeException re = new TransactionDataNotColocatedException(LocalizedStrings.PartitionedRegion_KEY_0_NOT_COLOCATED_WITH_TRANSACTION.toLocalizedString(entryKey));
@@ -10210,14 +10517,16 @@ public class PartitionedRegion extends LocalRegion implements
       } catch (ForceReattemptException e) {
         // create a new bucket
         InternalDistributedMember member = createBucket(bucketId, 0, null);
-        if (!getMyId().equals(member)) {
+        if (!getMyId().equals(member) && keyInfo.isCheckPrimary()) {
           throw new PrimaryBucketException("Bucket "+bucketId+" is not primary. Current primary holder is "+member);
         }
         count++;
       }
     }
     Assert.assertTrue(br != null, "Could not create storage for Entry");
-      br.checkForPrimary();
+      if (keyInfo.isCheckPrimary()) {
+        br.checkForPrimary();
+      }
     } catch(PrimaryBucketException pbe) {
       RuntimeException re = new TransactionDataRebalancedException(LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
       re.initCause(pbe);
@@ -10400,7 +10709,13 @@ public class PartitionedRegion extends LocalRegion implements
       PartitionedIndex parIndex = new PartitionedIndex(indexType, indexName, PartitionedRegion.this,
           indexedExpression, fromClause,  imports); // imports can be null
       String modifiedFromClause;
-
+      //In cases where we have no data yet (creation from cache xml), it would leave the populated flag to false
+      //Not really an issue as a put will trigger bucket index creation which should set this the flag to true
+      //However if the region is empty, we should set this flag to true so it will be reported as used even though
+      //there is no data in the region
+      if (!it.hasNext()) {
+        parIndex.setPopulated(true);
+      }
       while (it.hasNext()) {
         Map.Entry entry = (Map.Entry) it.next();
         Region bucket = (Region) entry.getValue();
@@ -10645,8 +10960,7 @@ public class PartitionedRegion extends LocalRegion implements
         }
 
         if (retryNode == null) {
-          checkShutdown(); // Prefer the closed exceptions over entry not found
-          throw new EntryNotFoundException(LocalizedStrings.PartitionedRegion_ENTRY_NOT_FOUND_FOR_KEY_0.toLocalizedString(event.getKey()));
+          checkEntryNotFound(event.getKey());
         }
         continue;
       }
@@ -10768,6 +11082,321 @@ public class PartitionedRegion extends LocalRegion implements
     }  
   }
 
+  /**
+   * Clear local primary buckets.
+   * This is currently only used by gemfirexd truncate table
+   * to clear the partitioned region.
+   */
+  public void clearLocalPrimaries() {
+ // rest of it should be done only if this is a store while RecoveryLock
+    // above still required even if this is an accessor
+    if (getLocalMaxMemory() > 0) {
+      // acquire the primary bucket locks
+      // do this in a loop to handle the corner cases where a primary
+      // bucket region ceases to be so when we actually take the lock
+      // (probably not required to do this in loop after the recovery lock)
+      // [sumedh] do we need both recovery lock and bucket locks?
+      boolean done = false;
+      Set<BucketRegion> lockedRegions = null;
+      while (!done) {
+        lockedRegions = getDataStore().getAllLocalPrimaryBucketRegions();
+        done = true;
+        for (BucketRegion br : lockedRegions) {
+          try {
+            br.doLockForPrimary(false);
+          } catch (RegionDestroyedException rde) {
+            done = false;
+            break;
+          } catch (PrimaryBucketException pbe) {
+            done = false;
+            break;
+          } catch (Exception e) {
+            // ignore any other exception
+            logger.debug(
+                "GemFireContainer#clear: ignoring exception "
+                    + "in bucket lock acquire", e);
+          }
+        }
+      }
+      
+      //hoplogs - pause HDFS dispatcher while we 
+      //clear the buckets to avoid missing some files
+      //during the clear
+      pauseHDFSDispatcher();
+
+      try {
+        // now clear the bucket regions; we go through the primary bucket
+        // regions so there is distribution for every bucket but that
+        // should be performant enough
+        for (BucketRegion br : lockedRegions) {
+          try {
+            br.clear();
+          } catch (Exception e) {
+            // ignore any other exception
+            logger.debug(
+                "GemFireContainer#clear: ignoring exception "
+                    + "in bucket clear", e);
+          }
+        }
+      } finally {
+        resumeHDFSDispatcher();
+        // release the bucket locks
+        for (BucketRegion br : lockedRegions) {
+          try {
+            br.doUnlockForPrimary();
+          } catch (Exception e) {
+            // ignore all exceptions at this stage
+            logger.debug(
+                "GemFireContainer#clear: ignoring exception "
+                    + "in bucket lock release", e);
+          }
+        }
+      }
+    }
+    
+  }
+  
+  /**Destroy all data in HDFS, if this region is using HDFS persistence.*/
+  private void destroyHDFSData() {
+    if(getHDFSStoreName() == null) {
+      return;
+    }
+    
+    try {
+      hdfsManager.destroyData();
+    } catch (IOException e) {
+      logger.warn(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA, e);
+    }
+  }
+
+  private void pauseHDFSDispatcher() {
+    if(!isHDFSRegion()) {
+      return;
+    }
+    AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+    if (eventProcessor == null) return;
+    eventProcessor.pauseDispatching();
+    eventProcessor.waitForDispatcherToPause();
+  }
+  
+  /**
+   * Get the statistics for the HDFS event queue associated with this region,
+   * if any
+   */
+  public AsyncEventQueueStats getHDFSEventQueueStats() {
+    AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+    if(asyncQ == null) {
+      return null;
+    }
+    return asyncQ.getStatistics();
+  }
+  
+  protected AbstractGatewaySenderEventProcessor getHDFSEventProcessor() {
+    final AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+    final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+    AbstractGatewaySenderEventProcessor eventProcessor = gatewaySender.getEventProcessor();
+    return eventProcessor;
+  }
+
+  public AsyncEventQueueImpl getHDFSEventQueue() {
+    String asyncQId = getHDFSEventQueueName();
+    if(asyncQId == null) {
+      return null;
+    }
+    final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+    return asyncQ;
+  }
+  
+  private void resumeHDFSDispatcher() {
+    if(!isHDFSRegion()) {
+      return;
+    }
+    AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+    if (eventProcessor == null) return;
+    eventProcessor.resumeDispatching();
+  }
+
+  protected String getHDFSEventQueueName() {
+    if (!this.getDataPolicy().withHDFS()) return null;
+    String colocatedWith = this.getPartitionAttributes().getColocatedWith();
+    String eventQueueName;
+    if (colocatedWith != null) {
+      PartitionedRegion leader = ColocationHelper.getLeaderRegionName(this);
+      eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(leader
+          .getFullPath());
+    }
+    else {
+      eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(getFullPath());
+    }
+    return eventQueueName;
+  }
+
+  /**
+   * schedules compaction on all members where this region is hosted.
+   * 
+   * @param isMajor
+   *          true for major compaction
+   * @param maxWaitTime
+   *          time to wait for the operation to complete, 0 will wait forever
+   */
+  @Override
+  public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
+    if (!this.isHDFSReadWriteRegion()) {
+      if (this.isHDFSRegion()) {
+        throw new UnsupportedOperationException(
+            LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+                .toLocalizedString(getName()));
+      }
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    // send request to remote data stores
+    long start = System.currentTimeMillis();
+    int waitTime = maxWaitTime * 1000;
+    HDFSForceCompactionArgs args = new HDFSForceCompactionArgs(getRegionAdvisor().getBucketSet(), isMajor, waitTime);
+    HDFSForceCompactionResultCollector rc = new HDFSForceCompactionResultCollector();
+    AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+    execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: ForceCompat invoking function with arguments "+args);
+    }
+    execution.execute(HDFSForceCompactionFunction.ID);
+    List<CompactionStatus> result = rc.getResult();
+    Set<Integer> successfulBuckets = rc.getSuccessfulBucketIds();
+    if (rc.shouldRetry()) {
+      int retries = 0;
+      while (retries < HDFSForceCompactionFunction.FORCE_COMPACTION_MAX_RETRIES) {
+        waitTime -= System.currentTimeMillis() - start;
+        if (maxWaitTime > 0 && waitTime < 0) {
+          break;
+        }
+        start = System.currentTimeMillis();
+        retries++;
+        Set<Integer> retryBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+        retryBuckets.removeAll(successfulBuckets);
+        
+        for (int bucketId : retryBuckets) {
+          getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper(waitTime));
+          long now = System.currentTimeMillis();
+          waitTime -= now - start;
+          start = now;
+        }
+        
+        args = new HDFSForceCompactionArgs(retryBuckets, isMajor, waitTime);
+        rc = new HDFSForceCompactionResultCollector();
+        execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+        execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+        if (logger.isDebugEnabled()) {
+          logger.debug("HDFS: ForceCompat re-invoking function with arguments "+args+" filter:"+retryBuckets);
+        }
+        execution.execute(HDFSForceCompactionFunction.ID);
+        result = rc.getResult();
+        successfulBuckets.addAll(rc.getSuccessfulBucketIds());
+      }
+    }
+    if (successfulBuckets.size() != getRegionAdvisor().getBucketSet().size()) {
+      checkReadiness();
+      Set<Integer> uncessfulBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+      uncessfulBuckets.removeAll(successfulBuckets);
+      throw new FunctionException("Could not run compaction on following buckets:"+uncessfulBuckets);
+    }
+  }
+
+  /**
+   * Schedules compaction on local buckets
+   * @param buckets the set of buckets to compact
+   * @param isMajor true for major compaction
+   * @param time TODO use this
+   * @return a list of futures for the scheduled compaction tasks
+   */
+  public List<Future<CompactionStatus>> forceLocalHDFSCompaction(Set<Integer> buckets, boolean isMajor, long time) {
+    List<Future<CompactionStatus>> futures = new ArrayList<Future<CompactionStatus>>();
+    if (!isDataStore() || hdfsManager == null || buckets == null || buckets.isEmpty()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "HDFS: did not schedule local " + (isMajor ? "Major" : "Minor") + " compaction");
+      }
+      // nothing to do
+      return futures;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: scheduling local " + (isMajor ? "Major" : "Minor") + " compaction for buckets:"+buckets);
+    }
+    Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers(buckets);
+    
+    for (HoplogOrganizer hoplogOrganizer : organizers) {
+      Future<CompactionStatus> f = hoplogOrganizer.forceCompaction(isMajor);
+      futures.add(f);
+    }
+    return futures;
+  }
+  
+  @Override
+  public void flushHDFSQueue(int maxWaitTime) {
+    if (!this.isHDFSRegion()) {
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    HDFSFlushQueueFunction.flushQueue(this, maxWaitTime);
+  }
+  
+  @Override
+  public long lastMajorHDFSCompaction() {
+    if (!this.isHDFSReadWriteRegion()) {
+      if (this.isHDFSRegion()) {
+        throw new UnsupportedOperationException(
+            LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+                .toLocalizedString(getName()));
+      }
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    List<Long> result = (List<Long>) FunctionService.onRegion(this)
+        .execute(HDFSLastCompactionTimeFunction.ID)
+        .getResult();
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: Result of LastCompactionTimeFunction "+result);
+    }
+    long min = Long.MAX_VALUE;
+    for (long ts : result) {
+      if (ts !=0 && ts < min) {
+        min = ts;
+      }
+    }
+    min = min == Long.MAX_VALUE ? 0 : min;
+    return min;
+  }
+
+  public long lastLocalMajorHDFSCompaction() {
+    if (!isDataStore() || hdfsManager == null) {
+      // nothing to do
+      return 0;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: getting local Major compaction time");
+    }
+    Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers();
+    long minTS = Long.MAX_VALUE;
+    for (HoplogOrganizer hoplogOrganizer : organizers) {
+      long ts = hoplogOrganizer.getLastMajorCompactionTimestamp();
+      if (ts !=0 && ts < minTS) {
+        minTS = ts;
+      }
+    }
+    minTS = minTS == Long.MAX_VALUE ? 0 : minTS;
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: local Major compaction time: "+minTS);
+    }
+    return minTS;
+  }
+
+
   public void shadowPRWaitForBucketRecovery() {
     assert this.isShadowPR();
     PartitionedRegion userPR = ColocationHelper.getLeaderRegion(this);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index 33bc158..d567b5a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -56,6 +56,7 @@ import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionException;
 import com.gemstone.gemfire.cache.execute.ResultSender;
 import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
 import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
 import com.gemstone.gemfire.cache.query.internal.QCompiler;
 import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
@@ -467,6 +468,12 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
                 if (getPartitionedRegion().getColocatedWith() == null) {
                   buk.getBucketAdvisor().setShadowBucketDestroyed(false);
                 }
+                if (getPartitionedRegion().isShadowPR()) {
+                  getPartitionedRegion().getColocatedWithRegion()
+                  .getRegionAdvisor()
+                  .getBucketAdvisor(possiblyFreeBucketId)
+                  .setShadowBucketDestroyed(false);
+                }
                 bukReg = createBucketRegion(possiblyFreeBucketId);
                 //Mark the bucket as hosting and distribute to peers
                 //before we release the dlock. This makes sure that our peers
@@ -585,6 +592,9 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
         		.getBucketTmpQueue(bucketId);
         if (tempQueue != null) {
           synchronized (tempQueue) {
+            for (GatewaySenderEventImpl event : tempQueue) {
+              event.release();
+            }
             tempQueue.clear();  
           }
         }
@@ -770,6 +780,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     }
     
     factory.setCompressor(this.partitionedRegion.getCompressor());
+    factory.setOffHeap(this.partitionedRegion.getOffHeap());
     
     factory.setBucketRegion(true); // prevent validation problems
     RegionAttributes attributes = factory.create();
@@ -1265,18 +1276,17 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
                             final long lastModified)
   throws PrimaryBucketException, ForceReattemptException {
     final BucketRegion br = getInitializedBucketForId(event.getKey(), bucketId);
-    return putLocally(bucketId, event, ifNew, ifOld, expectedOldValue,
-        requireOldValue, lastModified, br);
+    return putLocally(br, event, ifNew, ifOld, expectedOldValue,
+        requireOldValue, lastModified);
   }
 
-  public boolean putLocally(final Integer bucketId,
+  public boolean putLocally(final BucketRegion bucketRegion,
                             final EntryEventImpl event,
                             boolean ifNew,
                             boolean ifOld,
                             Object expectedOldValue,
                             boolean requireOldValue,
-                            final long lastModified,
-                            final BucketRegion bucketRegion)
+                            final long lastModified)
   throws PrimaryBucketException, ForceReattemptException {
     boolean didPut=false; // false if entry put fails
     
@@ -2026,7 +2036,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
       ForceReattemptException, PRLocallyDestroyedException
   {
 	  return getLocally(bucketId, key,aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, 
-			  clientEvent, returnTombstones, false);
+			  clientEvent, returnTombstones, false, false);
   }
   /**
    * Returns value corresponding to this key.
@@ -2045,19 +2055,19 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    */
   public Object getLocally(int bucketId, final Object key,
       final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, 
-      boolean returnTombstones, boolean opScopeIsLocal) throws PrimaryBucketException,
+      boolean returnTombstones, boolean opScopeIsLocal, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException, PRLocallyDestroyedException
   {
     final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId));
     //  check for primary (when a loader is present) done deeper in the BucketRegion
     Object ret=null;
     if (logger.isDebugEnabled()) {
-      logger.debug("getLocally:  key {}) bucketId={}{}{} region {} returnTombstones {}", key,
-          this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones);
+      logger.debug("getLocally:  key {}) bucketId={}{}{} region {} returnTombstones {} allowReadFromHDFS {}", key,
+          this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones, allowReadFromHDFS);
     }
     invokeBucketReadHook();
     try {
-      ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal);
+      ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, allowReadFromHDFS, false);
       checkIfBucketMoved(bucketRegion);
     }
     catch (RegionDestroyedException rde) {
@@ -2089,7 +2099,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    * @throws PrimaryBucketException if the locally managed bucket is not primary
    * @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
    */
-  public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones) throws PrimaryBucketException,
+  public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException {
     final BucketRegion bucketRegion = getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId());
     //  check for primary (when loader is present) done deeper in the BucketRegion
@@ -2100,7 +2110,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     invokeBucketReadHook();
 
     try {
-      RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, clientEvent, returnTombstones);
+      RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
       checkIfBucketMoved(bucketRegion);
       return result;
     } catch (RegionDestroyedException rde) {
@@ -2135,7 +2145,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    *           if the PartitionRegion is locally destroyed
    */
   public EntrySnapshot getEntryLocally(int bucketId, final Object key,
-      boolean access, boolean allowTombstones)
+      boolean access, boolean allowTombstones, boolean allowReadFromHDFS)
       throws EntryNotFoundException, PrimaryBucketException,
       ForceReattemptException, PRLocallyDestroyedException
   {
@@ -2148,7 +2158,12 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     EntrySnapshot res = null;
     RegionEntry ent = null;
     try {
-      ent = bucketRegion.entries.getEntry(key);
+      if (allowReadFromHDFS) {
+        ent = bucketRegion.entries.getEntry(key);
+      }
+      else {
+        ent = bucketRegion.entries.getOperationalEntryInVM(key);
+      }
 
       if (ent == null) {
         this.getPartitionedRegion().checkReadiness();
@@ -2257,8 +2272,15 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     invokeBucketReadHook();
     try{
       if (r != null) {
+        Set keys = r.keySet(allowTombstones);
+        if (getPartitionedRegion().isHDFSReadWriteRegion()) {
+          // hdfs regions can't copy all keys into memory
+          ret = keys;
+
+        } else  { 
         // A copy is made so that the bucket is free to move
         ret = new HashSet(r.keySet(allowTombstones));
+		}
         checkIfBucketMoved(r);
       }
     }
@@ -2308,8 +2330,8 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    * 5) updateBucket2Size if bucket is on more than 1 node or else bucket
    * listners would take care of size update. <br>
    * 
-   * @param bucketId
-   *          the bucket id of the key
+   * @param bucketRegion
+   *          the bucket to do the create in
    * @param event
    *          the particulars of the operation
    * @param ifNew
@@ -2323,7 +2345,7 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    * @throws EntryExistsException
    *           if an entry with this key already exists
    */
-  public boolean createLocally(Integer bucketId,
+  public boolean createLocally(final BucketRegion bucketRegion,
                                final EntryEventImpl event,
                                boolean ifNew,
                                boolean ifOld,
@@ -2331,8 +2353,6 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
                                final long lastModified)
   throws ForceReattemptException {
     boolean result = false;
-    final BucketRegion bucketRegion = getInitializedBucketForId(event.getKey(), bucketId);
-
     try{
       event.setRegion(bucketRegion); // convert to the bucket region
       if (event.isOriginRemote()) {
@@ -2494,6 +2514,10 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     return getSizeLocallyForBuckets(getAllLocalPrimaryBucketIds());
   }
 
+  public Map<Integer, SizeEntry> getSizeEstimateForLocalPrimaryBuckets() {
+    return getSizeEstimateLocallyForBuckets(getAllLocalPrimaryBucketIds());
+  }
+
   /**
    * This calculates size of all the primary bucket regions for the list of bucketIds.
    * 
@@ -2501,16 +2525,31 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
    * @return the size of all the primary bucket regions for the list of bucketIds.
    */
   public Map<Integer, SizeEntry> getSizeLocallyForBuckets(Collection<Integer> bucketIds) {
+    return getSizeLocallyForPrimary(bucketIds, false);
+  }
+
+  public Map<Integer, SizeEntry> getSizeEstimateLocallyForBuckets(Collection<Integer> bucketIds) {
+    return getSizeLocallyForPrimary(bucketIds, true);
+  }
+
+  private Map<Integer, SizeEntry> getSizeLocallyForPrimary(Collection<Integer> bucketIds, boolean estimate) {
     Map<Integer, SizeEntry> mySizeMap;
     if (this.localBucket2RegionMap.isEmpty()) {
       return Collections.emptyMap();
     }
     mySizeMap = new HashMap<Integer, SizeEntry>(this.localBucket2RegionMap.size());
-    BucketRegion r;
+    BucketRegion r = null;
     for(Integer bucketId : bucketIds) {
       try {
         r = getInitializedBucketForId(null, bucketId);
-        mySizeMap.put(bucketId, new SizeEntry(r.size(), r.getBucketAdvisor().isPrimary()));
+        mySizeMap.put(bucketId, new SizeEntry(estimate ? r.sizeEstimate() : r.size(), r.getBucketAdvisor().isPrimary()));
+//        if (getLogWriter().fineEnabled() && r.getBucketAdvisor().isPrimary()) {
+//          r.verifyTombstoneCount();
+//        }
+      } catch (PrimaryBucketException skip) {
+        // sizeEstimate() will throw this exception as it will not try to read from HDFS on a secondary bucket,
+        // this bucket will be retried in PartitionedRegion.getSizeForHDFS() fixes bug 49033
+        continue;
       } catch (ForceReattemptException skip) {
         continue;
       } catch(RegionDestroyedException skip) {
@@ -2844,6 +2883,29 @@ public final class PartitionedRegionDataStore implements HasCachePerfStats
     return bucketIds;
   }
 
+  /** a fast estimate of total bucket size */
+  public long getEstimatedLocalBucketSize(boolean primaryOnly) {
+    long size = 0;
+    for (BucketRegion br : localBucket2RegionMap.values()) {
+      if (!primaryOnly || br.getBucketAdvisor().isPrimary()) {
+        size += br.getEstimatedLocalSize();
+      }
+    }
+    return size;
+  }
+
+  /** a fast estimate of total bucket size */
+  public long getEstimatedLocalBucketSize(Set<Integer> bucketIds) {
+    long size = 0;
+    for (Integer bid : bucketIds) {
+      BucketRegion br = localBucket2RegionMap.get(bid);
+      if (br != null) {
+        size += br.getEstimatedLocalSize();
+      }
+    }
+    return size;
+  }
+
   public Object getLocalValueInVM(final Object key, int bucketId)
   {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
index 1e093c4..ca2862c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
@@ -59,10 +59,10 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
   @Override
   public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
       boolean generateCallbacks, Object value, boolean disableCopyOnRead,
-      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) {
+      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     TXStateProxy tx = r.cache.getTXMgr().internalSuspend();
     try {
-      return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+      return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
     } finally {
       r.cache.getTXMgr().resume(tx);
     }
@@ -75,9 +75,9 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
   }
   @Override
   public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient,
-  EntryEventImpl clientEvent, boolean returnTombstones) throws DataLocationException {
+  EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     PartitionedRegion pr = (PartitionedRegion)localRegion;
-    return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, clientEvent, returnTombstones);
+    return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
   }
   @Override
   public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
@@ -110,7 +110,7 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
       boolean allowTombstones) throws DataLocationException {
     PartitionedRegion pr = (PartitionedRegion)localRegion;
     return pr.getDataStore().getEntryLocally(keyInfo.getBucketId(),
-        keyInfo.getKey(), false, allowTombstones);
+        keyInfo.getKey(), false, allowTombstones, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
index 10dc256..965f96c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
@@ -115,6 +115,8 @@ public class PartitionedRegionHelper
     Set policies = new HashSet();
     policies.add(DEFAULT_DATA_POLICY);
     policies.add(DataPolicy.PERSISTENT_PARTITION);
+    policies.add(DataPolicy.HDFS_PARTITION);
+    policies.add(DataPolicy.HDFS_PERSISTENT_PARTITION);
 //    policies.add(DataPolicy.NORMAL);
     ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
   }


Mime
View raw message