geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [74/94] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop
Date Tue, 23 Feb 2016 20:24:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 0000000,2092508..b6d8c49
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@@ -1,0 -1,12975 +1,12975 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache;
+ 
+ import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.*;
+ 
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.AbstractSet;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.EnumSet;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Hashtable;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.RejectedExecutionException;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.CancelCriterion;
+ import com.gemstone.gemfire.CancelException;
+ import com.gemstone.gemfire.CopyHelper;
+ import com.gemstone.gemfire.DataSerializable;
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.DeltaSerializationException;
+ import com.gemstone.gemfire.InternalGemFireError;
+ import com.gemstone.gemfire.InternalGemFireException;
+ import com.gemstone.gemfire.LogWriter;
+ import com.gemstone.gemfire.SystemFailure;
+ import com.gemstone.gemfire.admin.internal.SystemMemberCacheEventProcessor;
+ import com.gemstone.gemfire.cache.AttributesMutator;
+ import com.gemstone.gemfire.cache.Cache;
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.CacheEvent;
+ import com.gemstone.gemfire.cache.CacheException;
+ import com.gemstone.gemfire.cache.CacheListener;
+ import com.gemstone.gemfire.cache.CacheLoader;
+ import com.gemstone.gemfire.cache.CacheLoaderException;
+ import com.gemstone.gemfire.cache.CacheRuntimeException;
+ import com.gemstone.gemfire.cache.CacheStatistics;
+ import com.gemstone.gemfire.cache.CacheWriter;
+ import com.gemstone.gemfire.cache.CacheWriterException;
+ import com.gemstone.gemfire.cache.CustomExpiry;
+ import com.gemstone.gemfire.cache.DataPolicy;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.cache.DiskStoreFactory;
+ import com.gemstone.gemfire.cache.DiskWriteAttributes;
+ import com.gemstone.gemfire.cache.DiskWriteAttributesFactory;
+ import com.gemstone.gemfire.cache.EntryDestroyedException;
+ import com.gemstone.gemfire.cache.EntryExistsException;
+ import com.gemstone.gemfire.cache.EntryNotFoundException;
+ import com.gemstone.gemfire.cache.EvictionAttributes;
+ import com.gemstone.gemfire.cache.ExpirationAttributes;
+ import com.gemstone.gemfire.cache.FailedSynchronizationException;
+ import com.gemstone.gemfire.cache.InterestRegistrationEvent;
+ import com.gemstone.gemfire.cache.InterestResultPolicy;
+ import com.gemstone.gemfire.cache.LoaderHelper;
+ import com.gemstone.gemfire.cache.LowMemoryException;
+ import com.gemstone.gemfire.cache.Operation;
+ import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
+ import com.gemstone.gemfire.cache.Region;
+ import com.gemstone.gemfire.cache.RegionAttributes;
+ import com.gemstone.gemfire.cache.RegionDestroyedException;
+ import com.gemstone.gemfire.cache.RegionEvent;
+ import com.gemstone.gemfire.cache.RegionExistsException;
+ import com.gemstone.gemfire.cache.RegionMembershipListener;
+ import com.gemstone.gemfire.cache.RegionReinitializedException;
+ import com.gemstone.gemfire.cache.Scope;
+ import com.gemstone.gemfire.cache.StatisticsDisabledException;
+ import com.gemstone.gemfire.cache.TimeoutException;
+ import com.gemstone.gemfire.cache.TransactionException;
+ import com.gemstone.gemfire.cache.TransactionId;
+ import com.gemstone.gemfire.cache.client.PoolManager;
+ import com.gemstone.gemfire.cache.client.ServerOperationException;
+ import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+ import com.gemstone.gemfire.cache.client.internal.Connection;
+ import com.gemstone.gemfire.cache.client.internal.Endpoint;
+ import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
+ import com.gemstone.gemfire.cache.control.ResourceManager;
+ import com.gemstone.gemfire.cache.execute.Function;
+ import com.gemstone.gemfire.cache.execute.ResultCollector;
+ import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+ import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+ import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+ import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+ import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
+ import com.gemstone.gemfire.cache.query.FunctionDomainException;
+ import com.gemstone.gemfire.cache.query.Index;
+ import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
+ import com.gemstone.gemfire.cache.query.IndexType;
+ import com.gemstone.gemfire.cache.query.MultiIndexCreationException;
+ import com.gemstone.gemfire.cache.query.NameResolutionException;
+ import com.gemstone.gemfire.cache.query.QueryException;
+ import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
+ import com.gemstone.gemfire.cache.query.QueryService;
+ import com.gemstone.gemfire.cache.query.SelectResults;
+ import com.gemstone.gemfire.cache.query.TypeMismatchException;
+ import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+ import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+ import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
+ import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+ import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
+ import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
+ import com.gemstone.gemfire.cache.util.ObjectSizer;
+ import com.gemstone.gemfire.cache.wan.GatewaySender;
+ import com.gemstone.gemfire.distributed.DistributedMember;
+ import com.gemstone.gemfire.distributed.DistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+ import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+ import com.gemstone.gemfire.distributed.internal.DistributionStats;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.ClassLoadUtil;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
+ import com.gemstone.gemfire.internal.NanoTimer;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
+ import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
+ import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
+ import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
+ import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+ import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+ import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
+ import com.gemstone.gemfire.internal.cache.control.ResourceListener;
+ import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionExecutor;
+ import com.gemstone.gemfire.internal.cache.execute.DistributedRegionFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+ import com.gemstone.gemfire.internal.cache.execute.RegionFunctionContextImpl;
+ import com.gemstone.gemfire.internal.cache.execute.ServerToClientFunctionResultSender;
+ import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+ import com.gemstone.gemfire.internal.cache.partitioned.RedundancyAlreadyMetException;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskExceptionHandler;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView;
+ import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
+ import com.gemstone.gemfire.internal.cache.persistence.query.IndexMap;
+ import com.gemstone.gemfire.internal.cache.persistence.query.mock.IndexMapImpl;
+ import com.gemstone.gemfire.internal.cache.tier.InterestType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientTombstoneMessage;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.HandShake;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionHolder;
+ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+ import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventCallbackArgument;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
+ import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+ import com.gemstone.gemfire.internal.util.concurrent.StoppableReadWriteLock;
+ import com.gemstone.gemfire.i18n.StringId;
+ 
+ /**
+  * Implementation of a local scoped-region. Note that this class has a different
+  * meaning starting with 3.0. In previous versions, a LocalRegion was the
+  * representation of a region in the VM. Starting with 3.0, a LocalRegion is a
+  * non-distributed region. The subclass DistributedRegion adds distribution
+  * behavior.
+  *
+  * @author Eric Zoerner
+  */
+ @SuppressWarnings("deprecation")
+ public class LocalRegion extends AbstractRegion 
+   implements LoaderHelperFactory, ResourceListener<MemoryEvent>,
+              DiskExceptionHandler, DiskRecoveryStore
+ {
+   private static final Logger logger = LogService.getLogger();
+   
+   /**
+    * Internal interface used to simulate failures when performing entry operations
+    * @author Mitch Thomas
+    * @since 5.7
+    */
+   public interface TestCallable {
+     public void call(LocalRegion r, Operation op, RegionEntry re);
+   }
+ 
+   // view types for iterators
+   public enum IteratorType {
+     KEYS, VALUES, ENTRIES
+   }
+ 
+   // iniitialization level
+   public static final int AFTER_INITIAL_IMAGE = 0;
+ 
+   public static final int BEFORE_INITIAL_IMAGE = 1;
+ 
+   public static final int ANY_INIT = 2;
+ 
+   /**
+    * thread local to indicate that this thread should bypass the initialization
+    * Latch
+    */
+   private static final ThreadLocal initializationThread = new ThreadLocal();
+ 
+   /* thread local to indicate its for persist data convert tool */
+   protected static final ThreadLocal isConversion = new ThreadLocal();
+   
+   // user attributes //
+   private Object regionUserAttribute;
+ 
+   protected Map entryUserAttributes; // @todo darrel: shouldn't this be an
+ 
+   // identity map whose key is a RegionEntry?
+ 
+   private final String regionName;
+ 
+   protected final LocalRegion parentRegion;
+ 
+   // set to true only if isDestroyed is also true
+   // and region is about to be recreated due to reinitialization by loading
+   // of a snapshot, etc.
+   private volatile boolean reinitialized_old = false;
+ 
+   protected volatile boolean isDestroyed = false;
+ 
+   // In case of parallel wan, when a destroy is called on userPR, it waits for
+   // parallelQueue to drain and then destroys paralleQueue. In this time if
+   // operation like put happens on userPR then it will keep on building parallel
+   // queue increasing time of userPR to get destroyed.this volatile boolean will
+   // block such put operation by throwing RegionDestroyedException
+   protected volatile boolean isDestroyedForParallelWAN = false;
+ 
+   // set to true after snapshot is loaded, to help get initial image
+   // make sure this is the right incarnation of this region
+   private volatile boolean reinitialized_new = false;
+ 
+   /** Lock used to prevent multiple concurrent destroy region operations */
+   private Semaphore destroyLock;
+ 
+   //guarded by regionExpiryLock.
+   private RegionTTLExpiryTask regionTTLExpiryTask = null;
+   //guarded by regionExpiryLock.
+   private RegionIdleExpiryTask regionIdleExpiryTask = null;
+   
+   private final Object regionExpiryLock = new Object();
+   // guarded by regionExpiryLock. Keeps track of how many txs are writing to this region.
+   private int txRefCount;
+ 
+   private final ConcurrentHashMap<RegionEntry, EntryExpiryTask> entryExpiryTasks = new ConcurrentHashMap<RegionEntry, EntryExpiryTask>();
+ 
+   /**
+    * Set to true after an invalidate region expiration so we don't get multiple
+    * expirations
+    */
+   volatile boolean regionInvalid = false;
+ 
+   public final RegionMap entries;
+ 
+   /**
+    * Set to true if this region supports transaction else false.
+    */
+   private final boolean supportsTX;
+ 
+   /** tracks threadID->seqno information for this region */
+   protected EventTracker eventTracker;
+   
+   /** tracks region-level version information for members.  See
+    * https://wiki.gemstone.com/display/gfe70/Consistency+in+Replicated+Regions+and+WAN
+    */
+   private RegionVersionVector versionVector;
+ 
+   private static final Pattern[] QUERY_PATTERNS = new Pattern[] {
+       Pattern.compile("^\\(*select .*", Pattern.CASE_INSENSITIVE
+           | Pattern.UNICODE_CASE | Pattern.DOTALL),
+       Pattern.compile("^import .*", Pattern.CASE_INSENSITIVE
+           | Pattern.UNICODE_CASE | Pattern.DOTALL) };
+ 
+ 
+   public static final String EXPIRY_MS_PROPERTY = "gemfire.EXPIRY_UNITS_MS";
+   
+   /**
+    * Used by unit tests to set expiry to milliseconds instead of the default
+    * seconds. Used in ExpiryTask.
+    *
+    * @since 5.0
+    */
+   final boolean EXPIRY_UNITS_MS;
+ 
+   // Indicates that the entries are in fact initialized. It turns out
+   // you can't trust the assignment of a volatile (as indicated above)
+   // to mean that the the thing being assigned is fully formed, only
+   // those things *before* the assignment are fully formed. mthomas 10/02/2005
+   private volatile boolean entriesInitialized;
+ 
+   /**
+    * contains Regions themselves // marked volatile to make sure it is fully
+    * initialized before being // accessed; (actually should be final)
+    */
+   protected volatile ConcurrentMap subregions;
+ 
+   private final Object subregionsLock = new Object();
+ 
+   // Used for synchronizzing access to client Cqs
+ //  private final Object clientCqsSync = new Object();
+ 
+   /**
+    * Prevents access to this region until it is done initializing, except for
+    * some special initializing operations such as replying to create region
+    * messages In JDK 1.5 we will use java.util.concurrent.CountDownLatch instead
+    * of com.gemstone.gemfire.internal.util.CountDownLatch.
+    */
+   protected final StoppableCountDownLatch initializationLatchBeforeGetInitialImage;
+ 
+   protected final StoppableCountDownLatch initializationLatchAfterGetInitialImage;
+ 
+   /**
+    * Used to hold off cache listener events until the afterRegionCreate is
+    * called
+    *
+    * @since 5.0
+    */
+   private final StoppableCountDownLatch afterRegionCreateEventLatch;
+ 
+   /**
+    * Set to true the first time isInitialized returns true.
+    */
+   private volatile boolean initialized = false; // added for bug 30223
+ 
+   /** Used for accessing region data on disk */
+   private final DiskRegion diskRegion;
+   
+   /**
+    * Used by transactions to suspend entry expiration while a transaction is in
+    * progress on a region. This field is only initialized if expiration is
+    * configured and transactions are possible.
+    */
+   private volatile StoppableReadWriteLock txExpirationLock;
+ 
+   /**
+    * Used for serializing netSearch and netLoad on a per key basis.
+    * CM <Object, Future>
+    */
+   protected final ConcurrentMap getFutures = new ConcurrentHashMap();
+ 
+   /*
+    * Asif: This boolean needs to be made true if the test needs to receive a
+    * synchronous callback just after clear on map is done. Its visibility is
+    * default so that only tests present in com.gemstone.gemfire.internal.cache
+    * will be able to see it
+    */
+   public static boolean ISSUE_CALLBACKS_TO_CACHE_OBSERVER = false;
+ 
+   /**
+    * A flag used to indicate that this Region is being used as an administrative
+    * Region, holding meta-data for a PartitionedRegion
+    */
+   final private boolean isUsedForPartitionedRegionAdmin;
+ 
+   final private boolean isUsedForPartitionedRegionBucket;
+ 
+   final private boolean isUsedForMetaRegion;
+   
+   final private boolean isMetaRegionWithTransactions;
+   
+   final private boolean isUsedForSerialGatewaySenderQueue;
+   
+   final private boolean isUsedForParallelGatewaySenderQueue;
+   
+   final private AbstractGatewaySender serialGatewaySender;
+ 
+   /**
+    * The factory used to create the LoaderHelper when a loader is invoked
+    */
+   protected final LoaderHelperFactory loaderHelperFactory;
+ 
+   /**
+    * Allow for different cacheperfstats locations... primarily for PartitionedRegions
+    */
+   private final CachePerfStats cachePerfStats;
+   private final boolean hasOwnStats; 
+ 
+ 
+   private final ImageState imageState;
+   /**
+    * Register interest count to track if any register interest is in progress for
+    * this region. This count will be incremented when register interest starts
+    * and decremented when register interest finishes.
+    * @guarded.By {@link #imageState}
+    */
+   private int riCnt = 0; /*
+                           * since always written while holding an exclusive write lock
+                           * and only read while holding a read lock
+                           * it does not need to be atomic or
+                           * protected by any other sync.
+                           */
+ 
+ 
+   /**
+    * Map of subregion full paths to serial numbers. These are subregions that
+    * were destroyed when this region was destroyed. This map remains null until
+    * this region is destroyed.
+    */
+   private volatile HashMap destroyedSubregionSerialNumbers;
+ 
+   /**
+    * This boolean is true when a member who has this region is running low on memory.
+    * It is used to reject region operations.
+    */
+   public final AtomicBoolean memoryThresholdReached = new AtomicBoolean(false);
+ 
+   // Lock for updating PR MetaData on client side 
+   public final Lock clientMetaDataLock = new ReentrantLock();
+   
+   
+   protected HdfsRegionManager hdfsManager;
+   protected HoplogListenerForRegion hoplogListener;
+ 
+   /**
+    * There seem to be cases where a region can be created and yet the
+    * distributed system is not yet in place...
+    *
+    * @author jpenney
+    *
+    */
+   protected class Stopper extends CancelCriterion {
+ 
+     @Override
+     public String cancelInProgress() {
+       // ---
+       // This grossness is necessary because there are instances where the
+       // region can exist without having a cache (XML creation)
+       checkFailure();
+       Cache c = LocalRegion.this.getCache();
+       if (c == null) {
+         return LocalizedStrings.LocalRegion_THE_CACHE_IS_NOT_AVAILABLE.toLocalizedString();
+       }
+       // --- end of grossness
+       return c.getCancelCriterion().cancelInProgress();
+     }
+ 
+     /* (non-Javadoc)
+      * @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
+      */
+     @Override
+     public RuntimeException generateCancelledException(Throwable e) {
+       // ---
+       // This grossness is necessary because there are instances where the
+       // region can exist without having a cache (XML creation)
+       checkFailure();
+       Cache c = LocalRegion.this.getCache();
+       if (c == null) {
+         return new CacheClosedException("No cache", e);
+       }
+       // --- end of grossness
+       return c.getCancelCriterion().generateCancelledException(e);
+     }
+ 
+   }
+ 
+   protected final CancelCriterion stopper = createStopper();
+ 
+   protected CancelCriterion createStopper() {
+     return new Stopper();
+   }
+ 
+   private final TestCallable testCallable;
+ 
+   /**
+    * ThreadLocal used to set the current region being initialized.
+    * 
+    * Currently used by the OpLog layer to initialize the
+    * {@link KeyWithRegionContext} if required.
+    */
+   private final static ThreadLocal<LocalRegion> initializingRegion =
+     new ThreadLocal<LocalRegion>();
+ 
+   /**
+    * Set to true if the region contains keys implementing
+    * {@link KeyWithRegionContext} that require setting up of region specific
+    * context after deserialization or recovery from disk.
+    */
+   private boolean keyRequiresRegionContext;
+ 
+   /**
+    * Get the current initializing region as set in the ThreadLocal.
+    * 
+    * Note that this value is cleared after the initialization of LocalRegion is
+    * done so is valid only for the duration of region creation and
+    * initialization.
+    */
+   public static LocalRegion getInitializingRegion() {
+     return initializingRegion.get();
+   }
+ 
+   /**
+    * Return true if the keys of this region implement
+    * {@link KeyWithRegionContext} that require region specific context
+    * initialization after deserialization or recovery from disk.
+    * 
+    * Currently used by SQLFabric for the optimized
+    * <code>CompactCompositeRegionKey</code> that points to the raw row bytes and
+    * so requires a handle to table schema for interpretation of those bytes.
+    */
+   public final boolean keyRequiresRegionContext() {
+     return this.keyRequiresRegionContext;
+   }
+ 
+   /**
+    * Set the {@link #keyRequiresRegionContext} flag to given value.
+    */
+   public final void setKeyRequiresRegionContext(boolean v) {
+     this.keyRequiresRegionContext = v;
+   }
+ 
+   public CancelCriterion getCancelCriterion() {
+     return this.stopper;
+   }
+ 
+   ////////////////// Public Methods ///////////////////////////////////////////
+ 
+   static String calcFullPath(String regionName, LocalRegion parentRegion) {
+     StringBuilder buf = null;
+     if (parentRegion == null) {
+       buf = new StringBuilder(regionName.length() + 1);
+     }
+     else {
+       String parentFull = parentRegion.getFullPath();
+       buf = new StringBuilder(parentFull.length() + regionName.length() + 1);
+       buf.append(parentFull);
+     }
+     buf.append(SEPARATOR).append(regionName);
+     return buf.toString();
+   }
+ 
+   /**
+    * Creates new region
+    */
+   protected LocalRegion(String regionName, RegionAttributes attrs,
+       LocalRegion parentRegion, GemFireCacheImpl cache,
+       InternalRegionArguments internalRegionArgs) throws DiskAccessException {
+     super(cache, attrs,regionName, internalRegionArgs);
+     // Initialized here (and defers to parent) to fix GEODE-128
+     this.EXPIRY_UNITS_MS = parentRegion != null ? parentRegion.EXPIRY_UNITS_MS : Boolean.getBoolean(EXPIRY_MS_PROPERTY);
+ 
+     Assert.assertTrue(regionName != null, "regionName must not be null");
+     this.sharedDataView = buildDataView();
+     this.regionName = regionName;
+     this.parentRegion = parentRegion;
+     this.fullPath = calcFullPath(regionName, parentRegion);
+ 
+     String myName = getFullPath();
+     if (internalRegionArgs.getPartitionedRegion() != null) {
+       myName = internalRegionArgs.getPartitionedRegion().getFullPath();
+     }
+     this.offHeap = attrs.getOffHeap() || Boolean.getBoolean(myName+":OFF_HEAP");
+     if (getOffHeap()) {
+       if (cache.getOffHeapStore() == null) {
+         throw new IllegalStateException(LocalizedStrings.
+             LocalRegion_THE_REGION_0_WAS_CONFIGURED_TO_USE_OFF_HEAP_MEMORY_BUT_OFF_HEAP_NOT_CONFIGURED.toLocalizedString(myName));
+       }
+     }
+     
+     this.initializationLatchBeforeGetInitialImage = new StoppableCountDownLatch(this.stopper, 1);
+     this.initializationLatchAfterGetInitialImage = new StoppableCountDownLatch(this.stopper, 1);
+     this.afterRegionCreateEventLatch = new StoppableCountDownLatch(this.stopper, 1);
+ 
+     // set the user-attribute object upfront for SQLFabric
+     if (internalRegionArgs.getUserAttribute() != null) {
+       setUserAttribute(internalRegionArgs.getUserAttribute());
+     }
+     setKeyRequiresRegionContext(internalRegionArgs.keyRequiresRegionContext());
+     initializingRegion.set(this);
+ 
+     if (internalRegionArgs.getCachePerfStatsHolder() != null) {
+       this.hasOwnStats = false;
+       this.cachePerfStats = internalRegionArgs.getCachePerfStatsHolder()
+           .getCachePerfStats();
+     }
+     else {
+       if (attrs.getPartitionAttributes() != null || isInternalRegion()
+           || internalRegionArgs.isUsedForMetaRegion()) {
+         this.hasOwnStats = false;
+         this.cachePerfStats = cache.getCachePerfStats();
+       }
+       else {
+         this.hasOwnStats = true;
+         this.cachePerfStats = new RegionPerfStats(cache, cache.getCachePerfStats(), regionName);
+       }
+     }
+ 
+     this.hdfsManager = initHDFSManager();
+     this.dsi = findDiskStore(attrs, internalRegionArgs);
+     this.diskRegion = createDiskRegion(internalRegionArgs);
+     this.entries = createRegionMap(internalRegionArgs);
+     this.entriesInitialized = true;
+     this.subregions = new ConcurrentHashMap();
+     // we only need a destroy lock if this is a root
+     if (parentRegion == null) {
+       initRoot();
+     }
+     if (internalRegionArgs.getLoaderHelperFactory() != null) {
+       this.loaderHelperFactory = internalRegionArgs.getLoaderHelperFactory();
+     }
+     else {
+       this.loaderHelperFactory = this;
+     }
+ 
+     this.isUsedForPartitionedRegionAdmin = internalRegionArgs
+         .isUsedForPartitionedRegionAdmin();
+     this.isUsedForPartitionedRegionBucket = internalRegionArgs
+         .isUsedForPartitionedRegionBucket();
+     this.isUsedForMetaRegion = internalRegionArgs
+         .isUsedForMetaRegion();
+     this.isMetaRegionWithTransactions = internalRegionArgs.isMetaRegionWithTransactions();
+     this.isUsedForSerialGatewaySenderQueue = internalRegionArgs.isUsedForSerialGatewaySenderQueue();
+     this.isUsedForParallelGatewaySenderQueue = internalRegionArgs.isUsedForParallelGatewaySenderQueue();
+     this.serialGatewaySender = internalRegionArgs.getSerialGatewaySender();
+     
+     if (!isUsedForMetaRegion && !isUsedForPartitionedRegionAdmin
+         && !isUsedForPartitionedRegionBucket
+         && !isUsedForSerialGatewaySenderQueue
+         && !isUsedForParallelGatewaySenderQueue) {
+       this.filterProfile = new FilterProfile(this);
+     }
+     
+     // initialize client to server proxy
+     this.srp = (this.getPoolName() != null)
+       ? new ServerRegionProxy(this)
+       : null;
+     this.imageState =
+       new UnsharedImageState(this.srp != null,
+                              getDataPolicy().withReplication() || getDataPolicy().isPreloaded(),
+                              getAttributes().getDataPolicy().withPersistence(),
+                              this.stopper);
+ 
+     createEventTracker();
+ 
+     // prevent internal regions from participating in a TX, bug 38709
+     this.supportsTX = !isSecret() && !isUsedForPartitionedRegionAdmin()
+         && !isUsedForMetaRegion() || isMetaRegionWithTransactions();
+ 
+     this.testCallable = internalRegionArgs.getTestCallable();
+     
+   }
+ 
+   private HdfsRegionManager initHDFSManager() {
+     HdfsRegionManager hdfsMgr = null;
+     if (this.getHDFSStoreName() != null) {
+       this.hoplogListener = new HoplogListenerForRegion();
+       HDFSRegionDirector.getInstance().setCache(cache);
+       hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this, 
+           this.getHDFSStoreName(), hoplogListener);
+     }
+     return hdfsMgr;
+   }
+ 
+   private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) {
+     RegionMap result = null;
+ 	if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) {
+       this.diskRegion.setEntriesMapIncompatible(true);
+     }
+     if (this.diskRegion != null) {
+       result = this.diskRegion.useExistingRegionMap(this);
+     }
+     if (result == null) {
+       RegionMap.Attributes ma = new RegionMap.Attributes();
+       ma.statisticsEnabled = this.statisticsEnabled;
+       ma.loadFactor = this.loadFactor;
+       ma.initialCapacity = this.initialCapacity;
+       ma.concurrencyLevel = this.concurrencyLevel;
+       result = RegionMapFactory.createVM(this, ma, internalRegionArgs);
+     }
+     return result;
+   }
+   
+   protected InternalDataView buildDataView() {
+     return new LocalRegionDataView();
+   }
+ 
+   /**
+    * initialize the event tracker.  Not all region implementations want or
+    * need one of these. Regions that require one should reimplement this method
+    * and create one like so:
+    * <code><pre>
+    *     this.eventTracker = new EventTracker(this.cache);
+    *     this.eventTracker.start();
+    * </pre></code>
+    */
+   void createEventTracker() {
+     // if LocalRegion is changed to have an event tracker, then the initialize()
+     // method should be changed to set it to "initialized" state when the
+     // region finishes initialization
+   }
+   
+   
+   /**
+    * Test method for getting the event tracker.
+    * 
+    * this method is for testing only.  Other region classes may track events using
+    * different mechanisms than EventTrackers
+    */
+   protected EventTracker getEventTracker() {
+     return this.eventTracker;
+   }
+   
+   /** returns the regions version-vector */
+   public RegionVersionVector getVersionVector() {
+     return this.versionVector;
+   }
+   
+   /** returns object used to guard the size() operation during tombstone removal */
+   public Object getSizeGuard() {
+     if (!this.concurrencyChecksEnabled) {
+       return new Object();
+     } else {
+       return this.fullPath; // avoids creating another sync object - could be anything unique to this region
+     }
+   }
+   
+   /** initializes a new version vector for this region */
+   protected void createVersionVector() {
+     
+     this.versionVector = RegionVersionVector.create(getVersionMember(), this);
+     
+     if (dataPolicy.withPersistence()) {
+       //copy the versions that we have recovered from disk into
+       //the version vector.
+       RegionVersionVector diskVector = this.diskRegion.getRegionVersionVector();
+       this.versionVector.recordVersions(diskVector.getCloneForTransmission());
+     } else if (!dataPolicy.withStorage()) {
+       // version vectors are currently only necessary in empty regions for
+       // tracking canonical member IDs
+       this.versionVector.turnOffRecordingForEmptyRegion();
+     }
+     if (this.srp != null) {
+       this.versionVector.setIsClientVector();
+     }
+     this.cache.getDistributionManager().addMembershipListener(this.versionVector);
+   }
+   
+   @Override
+   protected void updateEntryExpiryPossible()
+   {
+     super.updateEntryExpiryPossible();
+     if (!isEntryExpiryPossible()) {
+       // since expiration is no longer possible cleanup the tasks
+       cancelAllEntryExpiryTasks();
+     }
+   }
+ 
+   public IndexUpdater getIndexUpdater() {
+     return this.entries.getIndexUpdater();
+   }
+ 
+   boolean isCacheClosing()
+   {
+     return this.cache.isClosed();
+   }
+ 
+   public RegionEntry getRegionEntry(Object key) {
+     return this.entries.getEntry(key);
+   }
+   
+   /**
+    * Test hook - returns the version stamp for an entry in the form of a
+    * version tag
+    * @param key
+    * @return the entry version information
+    */
+   public VersionTag getVersionTag(Object key) {
+     Region.Entry entry = getEntry(key, true);
+     VersionTag tag = null;
+     if (entry != null && entry instanceof EntrySnapshot) {
+       tag = ((EntrySnapshot)entry).getVersionTag();
+     } else if (entry != null && entry instanceof NonTXEntry) {
+       tag = ((NonTXEntry)entry).getRegionEntry().getVersionStamp().asVersionTag();
+     }
+     return tag;
+   }
+   
+   /** removes any destroyed entries from the region and clear the destroyedKeys
+    * assert: Caller must be holding writeLock on is
+    */
+   private void destroyEntriesAndClearDestroyedKeysSet() {
+     ImageState is = getImageState();
+     Iterator iter = is.getDestroyedEntries();
+     while (iter.hasNext()) {
+       Object key = iter.next();
+       // destroy the entry which has value Token.DESTROYED
+       // If it is Token.DESTROYED then only destroy it.
+       this.entries.removeIfDestroyed(key); // fixes bug 41957
+     }
+   }
+ 
+   /**
+    * @since 5.7
+    */
+   protected final ServerRegionProxy srp;
+ 
+   private final InternalDataView sharedDataView;
+ 
+   public final ServerRegionProxy getServerProxy() {
+     return this.srp;
+   }
+   
+   public final boolean hasServerProxy() {
+     return this.srp != null;
+   }
+ 
+   /** Returns true if the ExpiryTask is currently allowed to expire. */
+   protected boolean isExpirationAllowed(ExpiryTask expiry)
+   {
+     return true;
+   }
+ 
+   void performExpiryTimeout(ExpiryTask p_task) throws CacheException
+   {
+     if (p_task != null) {
+       p_task.basicPerformTimeout(false);
+     }
+   }
+ 
+   private void initRoot()
+   {
+     this.destroyLock = new Semaphore(1);
+   }
+ 
+   public void handleMarker() {
+     
+     RegionEventImpl event = new RegionEventImpl(this,
+         Operation.MARKER, null, false, getMyId(),
+         false /* generate EventID */);
+ 
+     dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_LIVE, event);
+   }
+ 
+   public AttributesMutator getAttributesMutator()
+   {
+     checkReadiness();
+     return this;
+   }
+ 
+   public Region createSubregion(String subregionName,
+       RegionAttributes regionAttributes) throws RegionExistsException,
+       TimeoutException
+   {
+     try {
+       return createSubregion(subregionName, regionAttributes,
+           new InternalRegionArguments().setDestroyLockFlag(true)
+               .setRecreateFlag(false));
+     }
+     catch (IOException e) {
+       // only happens when loading a snapshot, not here
+       InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION.toLocalizedString());
+       assErr.initCause(e);
+       throw assErr;
+ 
+     }
+     catch (ClassNotFoundException e) {
+       // only happens when loading a snapshot, not here
+       InternalGemFireError assErr = new InternalGemFireError(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION.toLocalizedString());
+       assErr.initCause(e);
+       throw assErr;
+ 
+     }
+   }
+ 
+   /**
+    * Returns the member id of my distributed system
+    *
+    * @since 5.0
+    */
+   @Override
+   protected InternalDistributedMember getMyId()
+   {
+     return this.cache.getMyId();
+   }
+   
+   public VersionSource getVersionMember()
+   {
+     if(dataPolicy.withPersistence()) {
+       return getDiskStore().getDiskStoreID();
+     } else {
+       return this.cache.getMyId();
+     }
+   }
+ 
+   public Region createSubregion(String subregionName,
+       RegionAttributes attrs,
+       InternalRegionArguments internalRegionArgs) throws RegionExistsException,
+       TimeoutException, IOException, ClassNotFoundException
+   {
+     checkReadiness();
+     LocalRegion newRegion = null;
+     RegionAttributes regionAttributes = attrs;
+     attrs = cache.invokeRegionBefore(this, subregionName, attrs, internalRegionArgs);
+     final InputStream snapshotInputStream = internalRegionArgs
+         .getSnapshotInputStream();
+     final boolean getDestroyLock = internalRegionArgs.getDestroyLockFlag();
+     final InternalDistributedMember imageTarget = internalRegionArgs
+         .getImageTarget();
+     try {
+       if (getDestroyLock)
+         acquireDestroyLock();
+       LocalRegion existing = null;
+       try {
+         if (isDestroyed()) {
+           if (this.reinitialized_old) {
+             throw new RegionReinitializedException(toString(), getFullPath());
+           }
+           throw new RegionDestroyedException(toString(), getFullPath());
+         }
+         validateRegionName(subregionName);
+ 
+         validateSubregionAttributes(regionAttributes);
+         String regionPath = calcFullPath(subregionName, this);
+ 
+         // lock down the subregionsLock
+         // to prevent other threads from adding a region to it in toRegion
+         // but don't wait on initialization while synchronized (distributed
+         // deadlock)
+         synchronized (this.subregionsLock) {
+           
+           existing = (LocalRegion)this.subregions.get(subregionName);
+ 
+           if (existing == null) {
+             // create the async queue for HDFS if required. 
+             HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath,
+                 regionAttributes, this.cache);
+             regionAttributes = cache.setEvictionAttributesForLargeRegion(
+                 regionAttributes);
+             if (regionAttributes.getScope().isDistributed()
+                 && internalRegionArgs.isUsedForPartitionedRegionBucket()) {
+               final PartitionedRegion pr = internalRegionArgs
+                   .getPartitionedRegion();
+               internalRegionArgs.setIndexUpdater(pr.getIndexUpdater());
+               internalRegionArgs.setUserAttribute(pr.getUserAttribute());
+               internalRegionArgs.setKeyRequiresRegionContext(pr
+                   .keyRequiresRegionContext());
+               if (pr.isShadowPR()) {
+                 if (!pr.isShadowPRForHDFS()) {
+                     newRegion = new BucketRegionQueue(subregionName, regionAttributes,
+                       this, this.cache, internalRegionArgs);
+                 }
+                 else {
+                    newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes,
+                       this, this.cache, internalRegionArgs);
+                 }
+                 
+               } else {
+                 newRegion = new BucketRegion(subregionName, regionAttributes,
+                     this, this.cache, internalRegionArgs);  
+               }
+             }
+             else if (regionAttributes.getPartitionAttributes() != null) {
+               newRegion = new PartitionedRegion(subregionName,
+                   regionAttributes, this, this.cache,  internalRegionArgs);
+             }
+             else {
+               boolean local = regionAttributes.getScope().isLocal();
+               newRegion = local ? new LocalRegion(subregionName,
+                   regionAttributes, this, this.cache, internalRegionArgs)
+                   : new DistributedRegion(subregionName, regionAttributes,
+                       this, this.cache, internalRegionArgs);
+             }
+             Object o = this.subregions.putIfAbsent(subregionName, newRegion);
+ 
+             Assert.assertTrue(o == null);
+ 
+             Assert.assertTrue(!newRegion.isInitialized());
+ 
+             //
+             if (logger.isDebugEnabled()) {
+               logger.debug("Subregion created: {}", newRegion.getFullPath());
+             }
+             if (snapshotInputStream != null || imageTarget != null
+                 || internalRegionArgs.getRecreateFlag()) {
+               this.cache.regionReinitialized(newRegion); // fix for bug 33534
+             }
+ 
+           } // endif: existing == null
+         } // end synchronization
+       }
+       finally {
+         if (getDestroyLock)
+           releaseDestroyLock();
+       }
+       
+       //Fix for bug 42127 - moved to outside of the destroy lock.
+       if (existing != null) {
+         // now outside of synchronization we must wait for appropriate
+         // initialization on existing region before returning a reference to
+         // it
+         existing.waitOnInitialization();
+         // fix for bug 32570
+         throw new RegionExistsException(existing);
+       }
+ 
+       
+       boolean success = false;
+       try {
+         newRegion.checkReadiness();
+         this.cache.setRegionByPath(newRegion.getFullPath(), newRegion);
+         if (regionAttributes instanceof UserSpecifiedRegionAttributes){
+           internalRegionArgs.setIndexes((
+             (UserSpecifiedRegionAttributes)regionAttributes).getIndexes());  
+         }
+         newRegion.initialize(snapshotInputStream, imageTarget, internalRegionArgs); // releases initialization Latches
+         //register the region with resource manager to get memory events
+         if(!newRegion.isInternalRegion()){
+           if (!newRegion.isDestroyed) {
+             cache.getResourceManager().addResourceListener(ResourceType.MEMORY, newRegion);
+             
+             if (!newRegion.getOffHeap()) {
+               newRegion.initialCriticalMembers(cache.getResourceManager().getHeapMonitor().getState().isCritical(), cache
+                   .getResourceAdvisor().adviseCritialMembers());
+             } else {
+               newRegion.initialCriticalMembers(cache.getResourceManager().getHeapMonitor().getState().isCritical()
+                   || cache.getResourceManager().getOffHeapMonitor().getState().isCritical(), cache.getResourceAdvisor()
+                   .adviseCritialMembers());
+             }
+ 
+             // synchronization would be done on ManagementAdapter.regionOpLock
+             // instead of destroyLock in LocalRegion? ManagementAdapter is one
+             // of the Resource Event listeners            
+             
+             InternalDistributedSystem system = this.cache
+                 .getDistributedSystem();
+             system.handleResourceEvent(ResourceEvent.REGION_CREATE, newRegion);
+           }
+         }
+         success = true;
+       } catch (CancelException | RegionDestroyedException | RedundancyAlreadyMetException e) {
+         // don't print a call stack
+         throw e;
+       } catch (final RuntimeException validationException) {
+         logger.warn(LocalizedMessage.create(LocalizedStrings.LocalRegion_INITIALIZATION_FAILED_FOR_REGION_0,
+             getFullPath()), validationException);
+         throw validationException;
+       }
+       finally {
+         if (!success) {
+           this.cache.setRegionByPath(newRegion.getFullPath(), null);
+           initializationFailed(newRegion);
+           cache.getResourceManager(false).removeResourceListener(newRegion);
+         }
+       }
+ 
+       newRegion.postCreateRegion();
+     }
+     finally {
+       // make sure region initialization latch is open regardless
+       // before returning;
+       // if the latch is not open at this point, then an exception must
+       // have occurred
+       if (newRegion != null && !newRegion.isInitialized()) {
+         if (logger.isDebugEnabled()) {
+           logger.debug("Region initialize latch is closed, Error must have occurred");
+         }
+       }
+     }
+ 
+     cache.invokeRegionAfter(newRegion);
+     return newRegion;
+   }
+ 
+   public final void create(Object key, Object value, Object aCallbackArgument)
+       throws TimeoutException, EntryExistsException, CacheWriterException {
+     long startPut = CachePerfStats.getStatTime();
+     EntryEventImpl event = newCreateEntryEvent(key, value, aCallbackArgument);
+     validatedCreate(event, startPut);
+     // TODO OFFHEAP: validatedCreate calls freeOffHeapResources
+   }
+ 
+   public final void validatedCreate(EntryEventImpl event, long startPut)
+       throws TimeoutException, EntryExistsException, CacheWriterException {
+ 
+     try {
+       if (event.getEventId() == null && generateEventID()) {
+         event.setNewEventId(cache.getDistributedSystem());
+       }
+       assert event.isFetchFromHDFS() : "validatedPut() should have been called";
+       // Fix for 42448 - Only make create with null a local invalidate for
+       // normal regions. Otherwise, it will become a distributed invalidate.
+       if (getDataPolicy() == DataPolicy.NORMAL) {
+         event.setLocalInvalid(true);
+       }
+       discoverJTA();
+       if (!basicPut(event, true, // ifNew
+           false, // ifOld
+           null, // expectedOldValue
+           true // requireOldValue TODO txMerge why is oldValue required for
+                // create? I think so that the EntryExistsException will have it.
+       )) {
+         throw new EntryExistsException(event.getKey().toString(),
+             event.getOldValue());
+       } else {
+         if (!getDataView().isDeferredStats()) {
+           getCachePerfStats().endPut(startPut, false);
+         }
+       }
+     } finally {
+ 
+       event.release();
+ 
+     }
+   }
+ 
+   // split into a separate newCreateEntryEvent since SQLFabric may need to
+   // manipulate event before doing the put (e.g. posDup flag)
+   public final EntryEventImpl newCreateEntryEvent(Object key, Object value,
+       Object aCallbackArgument) {
+ 
+     validateArguments(key, value, aCallbackArgument);
+     checkReadiness();
+     checkForLimitedOrNoAccess();
+ 
+     return EntryEventImpl.create(this, Operation.CREATE, key,
+         value, aCallbackArgument, false, getMyId())
+         /* to distinguish genuine create */.setCreate(true);
+   }
+ 
+   /**
+    * The default Region implementation will generate EvenTID in the EntryEvent
+    * object. This method is overridden in special Region objects like HARegion
+    * or SingleWriteSingleReadRegionQueue.SingleReadWriteMetaRegion to return
+    * false as the event propagation from those regions do not need EventID
+    * objects
+    *
+    * <p>author Asif
+    * @return boolean indicating whether to generate eventID or not
+    */
+   @Override
+   public boolean generateEventID()
+   {     
+     return !(isUsedForPartitionedRegionAdmin()
+         || isUsedForPartitionedRegionBucket() );
+   }
+ 
+   public final Object destroy(Object key, Object aCallbackArgument)
+       throws TimeoutException, EntryNotFoundException, CacheWriterException {
+     EntryEventImpl event = newDestroyEntryEvent(key, aCallbackArgument);
+     return validatedDestroy(key, event);
+     // TODO OFFHEAP: validatedDestroy calls freeOffHeapResources
+   }
+ 
+   /**
+    * Destroys entry without performing validations. Call this after validating
+    * key, callback arg, and runtime state.
+    */
+   public Object validatedDestroy(Object key, EntryEventImpl event)
+       throws TimeoutException, EntryNotFoundException, CacheWriterException
+  {
+     try {
+       if (event.getEventId() == null && generateEventID()) {
+         event.setNewEventId(cache.getDistributedSystem());
+       }
+       basicDestroy(event, true, // cacheWrite
+           null); // expectedOldValue
+       if (event.isOldValueOffHeap()) {
+         return null;
+       } else {
+         return handleNotAvailable(event.getOldValue());
+       }
+     } finally {
+       event.release();
+     }
+   }
+ 
+   // split into a separate newDestroyEntryEvent since SQLFabric may need to
+   // manipulate event before doing the put (e.g. posDup flag)
+   public final EntryEventImpl newDestroyEntryEvent(Object key,
+       Object aCallbackArgument) {
+     validateKey(key);
+     validateCallbackArg(aCallbackArgument);
+     checkReadiness();
+     checkForLimitedOrNoAccess();
+ 
+     return EntryEventImpl.create(this, Operation.DESTROY, key,
+         null/* newValue */, aCallbackArgument, false, getMyId());
+   }
+ 
+   public void destroyRegion(Object aCallbackArgument)
+       throws CacheWriterException, TimeoutException
+   {
+     getDataView().checkSupportsRegionDestroy();
+     checkForLimitedOrNoAccess();
+ 
+     RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_DESTROY,
+         aCallbackArgument, false, getMyId(), generateEventID());
+     basicDestroyRegion(event, true);
+   }
+ 
+   public InternalDataView getDataView() {
+     final TXStateInterface tx = getTXState();
+     if (tx == null) {
+       return this.sharedDataView;
+     }
+     return tx;
+   }
+ 
+   /**
+    * Fetch the de-serialized value from non-transactional state.
+    * @param keyInfo to which the value is associated
+    * @param updateStats true if the entry stats should be updated.
+    * @param disableCopyOnRead if true then disable copy on read
+    * @param preferCD true if the preferred result form is CachedDeserializable
+    * @param clientEvent client's event, if any (for version tag retrieval)
+    * @param returnTombstones whether destroyed entries should be returned
+    * @param retainResult if true then the result may be a retained off-heap reference
+    * @return the value for the given key
+    */
+   public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead, 
+   boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+     if (this.diskRegion != null) {
+       this.diskRegion.setClearCountReference();
+     }
+     try {
+       if (re == null) {
+         if (allowReadFromHDFS) {
+           re = this.entries.getEntry(keyInfo.getKey());
+         } else {
+           re = this.entries.getOperationalEntryInVM(keyInfo.getKey());
+         }
+       }
+       //skip updating the stats if the value is null
+       // TODO - We need to clean up the callers of the this class so that we can
+       // update the statistics here, where it would make more sense.
+       if (re == null) {
+         return null;
+       }
+       final Object value;
+       if (clientEvent != null && re.getVersionStamp() != null) {
+         // defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
+         final boolean disabled = this.entries.disableLruUpdateCallback();
+         try {
+         synchronized(re) { // bug #51059 value & version must be obtained atomically
+           clientEvent.setVersionTag(re.getVersionStamp().asVersionTag());
+           value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD, retainResult);
+         }
+         } finally {
+           if (disabled) {
+             this.entries.enableLruUpdateCallback();
+           }
+           try {
+             this.entries.lruUpdateCallback();
+           }catch( DiskAccessException dae) {
+             this.handleDiskAccessException(dae);
+             throw dae;
+           }
+         }
+       } else {
+         value = getDeserialized(re, updateStats, disableCopyOnRead, preferCD, retainResult);
+       }
+       if (logger.isTraceEnabled() && !(this instanceof HARegion)) {
+         logger.trace("getDeserializedValue for {} returning version: {} returnTombstones: {} value: {}",
+             keyInfo.getKey(), (re.getVersionStamp()==null? "null" : re.getVersionStamp().asVersionTag()), returnTombstones, value);
+       }
+       return value;
+     }
+     finally {
+       if (this.diskRegion != null) {
+         this.diskRegion.removeClearCountReference();
+       }
+     }
+   }
+ 
+   /**
+    *
+    * @param re
+    * @param updateStats
+    * @param disableCopyOnRead if true then do not make a copy on read
+    * @param preferCD true if the preferred result form is CachedDeserializable
+    * @param retainResult if true then the result may be a retained off-heap reference
+    * @return the value found, which can be
+    *  <ul>
+    *    <li> null if the value was removed from the region entry
+    *    <li>Token.INVALID if the value of the region entry is invalid
+    *    <li>Token.LOCAL_INVALID if the value of the region entry is local invalid
+    *  </ul>
+    */
+   @Retained
+   protected final Object getDeserialized(RegionEntry re, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, boolean retainResult) {
+     assert !retainResult || preferCD;
+     try {
+       @Retained Object v = null;
+       try {
+         if (retainResult) {
+           v = re.getValueRetain(this);
+         } else {
+           v = re.getValue(this);
+         }
+       } catch(DiskAccessException dae) {
+         this.handleDiskAccessException(dae);
+         throw dae;
+       }
+   
+       //skip updating the stats if the value is null
+       if (v == null) {
+         return null;
+       }
+       if (v instanceof CachedDeserializable) {
+         if (!preferCD) {
+           if (isCopyOnRead()) {
+             if (disableCopyOnRead) {
+               v = ((CachedDeserializable)v).getDeserializedForReading();
+             } else {
+               v = ((CachedDeserializable)v).getDeserializedWritableCopy(this, re);
+             }
+           } else {
+             v = ((CachedDeserializable)v).getDeserializedValue(this, re);
+           }
+         }
+       }
+       else if (!disableCopyOnRead) {
+           v = conditionalCopy(v);
+       }
+   
+       if (updateStats) {
+         updateStatsForGet(re, v != null && !Token.isInvalid(v));
+       }
+       return v;
+     } catch(IllegalArgumentException i) {
+       IllegalArgumentException iae = new IllegalArgumentException(LocalizedStrings.DONT_RELEASE.toLocalizedString("Error while deserializing value for key="+re.getKey()));
+       iae.initCause(i);
+       throw iae;
+     }
+   }
+   
+   @Override
+   public Object get(Object key, Object aCallbackArgument,
+       boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException
+   {
+     Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/);
+     if (Token.isInvalid(result)) {
+       result = null;
+     }
+     return result;
+   }
+   
+   /*
+    * @see BucketRegion#getSerialized(KeyInfo, boolean, boolean)
+    */
+   public Object get(Object key, Object aCallbackArgument,
+ 	      boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ 	      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException {
+ 	  return get(key, aCallbackArgument,
+ 		      generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false);
+   }
+   
+   /**
+    * The result of this operation may be an off-heap reference that the caller must release
+    */
+   @Retained
+   public Object getRetained(Object key, Object aCallbackArgument,
+       boolean generateCallbacks, boolean disableCopyOnRead,
+       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException {
+     return getRetained(key, aCallbackArgument,
+               generateCallbacks, disableCopyOnRead, requestingClient, clientEvent, returnTombstones, false);
+   }
+ 
+   /**
+    * The result of this operation may be an off-heap reference that the caller must release.
+    * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
+    */
+   @Retained
+   public Object getRetained(Object key, Object aCallbackArgument,
+       boolean generateCallbacks, boolean disableCopyOnRead,
+       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
+     return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false);
+   }
+   /**
+    * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
+    * @param retainResult if true then the result may be a retained off-heap reference.
+    */
+   public Object get(Object key, Object aCallbackArgument,
+       boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, 
+ 	  boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException
+   {
+     assert !retainResult || preferCD;
+     validateKey(key);
+     validateCallbackArg(aCallbackArgument);
+     checkReadiness();
+     checkForNoAccess();
+     discoverJTA();
+     CachePerfStats stats = getCachePerfStats();
+     long start = stats.startGet();
+     boolean isMiss = true;
+     try {
+       KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
+       Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
+       final boolean isCreate = value == null;
+       isMiss = value == null || Token.isInvalid(value)
+           || (!returnTombstones && value == Token.TOMBSTONE);
+       // Note: if the value was Token.DESTROYED then getDeserialized
+       // returns null so we don't need the following in the above expression:
+       // || (isRegInterestInProgress() && Token.isDestroyed(value))
+       // because (value == null) will be true in this case.
+       if (isMiss) {
+         // to fix bug 51509 raise the precedence of opScopeIsLocal
+         // if scope is local and there is no loader, then
+         // don't go further to try and get value
+         if (!opScopeIsLocal
+             && ((getScope().isDistributed() && !isHDFSRegion())
+                 || hasServerProxy()
+                 || basicGetLoader() != null)) { 
+           // serialize search/load threads if not in txn
+           // TODO OFFHEAP OPTIMIZE: findObject can be enhanced to use the retainResult flag
+           value = getDataView().findObject(keyInfo,
+               this, isCreate, generateCallbacks, value, disableCopyOnRead,
+               preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
+           if (!returnTombstones && value == Token.TOMBSTONE) {
+             value = null;
+           }
+         }
+         else { // local scope with no loader, still might need to update stats
+           if (isCreate) {
+             recordMiss(null, key);
+           }
+           value = null;
+         }
+       }
+       return value;
+     }
+     finally {
+       stats.endGet(start, isMiss);
+     }
+   }
+ 
+   /**
+    * Update region and potentially entry stats for the miss case 
+    * @param re optional region entry, fetched if null
+    * @param key the key used to fetch the region entry
+    */
+   final public void recordMiss(final RegionEntry re, Object key) {
+     final RegionEntry e;
+     if (re == null && !isTX() && !isHDFSRegion()) {
+       e = basicGetEntry(key);
+     } else {
+       e = re;
+     }
+     updateStatsForGet(e, false);
+   }
+ 
+   /**
+    * @return true if this region has been configured for HDFS persistence
+    */
+   public boolean isHDFSRegion() {
+     return false;
+   }
+ 
+   /**
+    * @return true if this region is configured to read and write data from HDFS
+    */
+   public boolean isHDFSReadWriteRegion() {
+     return false;
+   }
+ 
+   /**
+    * @return true if this region is configured to only write to HDFS
+    */
+   protected boolean isHDFSWriteOnly() {
+     return false;
+   }
+ 
+   /**
+    * FOR TESTING ONLY
+    */
+   public HoplogListenerForRegion getHoplogListener() {
+     return hoplogListener;
+   }
+   
+   /**
+    * FOR TESTING ONLY
+    */
+   public HdfsRegionManager getHdfsRegionManager() {
+     return hdfsManager;
+   }
+   
+   /**
+    * optimized to only allow one thread to do a search/load, other threads wait
+    * on a future
+    *
+    * @param keyInfo
+    * @param p_isCreate
+    *                true if call found no entry; false if updating an existing
+    *                entry
+    * @param generateCallbacks
+    * @param p_localValue
+    *                the value retrieved from the region for this object.
+    * @param disableCopyOnRead if true then do not make a copy
+    * @param preferCD true if the preferred result form is CachedDeserializable
+    * @param clientEvent the client event, if any
+    * @param returnTombstones whether to return tombstones
+    */
+   @Retained
+   Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate,
+       boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD,
+       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
+       throws TimeoutException, CacheLoaderException
+   {
+     final Object key = keyInfo.getKey();
+   
+     Object localValue = p_localValue;
+     boolean isCreate = p_isCreate;
+     Object[] valueAndVersion = null;
+     @Retained Object result = null;
+     FutureResult thisFuture = new FutureResult(this.stopper);
+     Future otherFuture = (Future)this.getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
+     // only one thread can get their future into the map for this key at a time
+     if (otherFuture != null) {
+       try {
+         valueAndVersion = (Object[])otherFuture.get();
+         if (valueAndVersion != null) {
+           result = valueAndVersion[0];
+           if (clientEvent != null) {
+             clientEvent.setVersionTag((VersionTag)valueAndVersion[1]);
+           }
+           if (!preferCD && result instanceof CachedDeserializable) {
+             CachedDeserializable cd = (CachedDeserializable)result;
+             // fix for bug 43023
+             if (!disableCopyOnRead && isCopyOnRead()) {
+               result = cd.getDeserializedWritableCopy(null, null);
+             } else {
+               result = cd.getDeserializedForReading();
+             }
+            
+           } else if (!disableCopyOnRead) {
+             result = conditionalCopy(result);
+           }
+           //For sqlf since the deserialized value is nothing but chunk
+           // before returning the found value increase its use count
 -          if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) {
 -            if(!((Chunk)result).retain()) {
++          if(GemFireCacheImpl.sqlfSystem() && result instanceof ObjectChunk) {
++            if(!((ObjectChunk)result).retain()) {
+               return null;
+             }
+           }
+           // what was a miss is now a hit
+           RegionEntry re = null;
+           if (isCreate) {
+             re = basicGetEntry(keyInfo.getKey());
+             updateStatsForGet(re, true);
+           }
+           return result;
+         }
+         // if value == null, try our own search/load
+       }
+       catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+         // TODO check a CancelCriterion here?
+         return null;
+       }
+       catch (ExecutionException e) {
+         // unexpected since there is no background thread
+         InternalGemFireError err = new InternalGemFireError(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION.toLocalizedString());
+         err.initCause(err);
+         throw err;
+       }
+     }
+     // didn't find a future, do one more probe for the entry to catch a race
+     // condition where the future was just removed by another thread
+     try {
+       boolean partitioned = this.getDataPolicy().withPartitioning();
+       if (!partitioned) {
+         localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
+ 
+         // stats have now been updated
+         if (localValue != null && !Token.isInvalid(localValue)) {
+           result = localValue;
+           return result;
+         }
+         isCreate = localValue == null;
+         result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
+             localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
+ 
+       } else {
+         
+         // This code was moved from PartitionedRegion.nonTxnFindObject().  That method has been removed.
+         // For PRs we don't want to deserialize the value and we can't use findObjectInSystem because
+         // it can invoke code that is transactional.
+         result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
+             localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
+         // TODO why are we not passing the client event or returnTombstones in the above invokation?
+       }
+ 
+       if (result == null && localValue != null) {
+         if (localValue != Token.TOMBSTONE || returnTombstones) {
+           result = localValue;
+         }
+       }
+       // findObjectInSystem does not call conditionalCopy
+     }
+     finally {
+       if (result != null) {
+         VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag();
+         thisFuture.set(new Object[]{result, tag});
+       } else {
+         thisFuture.set(null);
+       }
+       this.getFutures.remove(keyInfo.getKey());
+     }
+     if (!disableCopyOnRead) {
+       result = conditionalCopy(result);
+     }
+     return result;
+   }
+ 
+   /**
+    * Returns true if get should give a copy; false if a reference.
+    *
+    * @since 4.0
+    */
+   protected boolean isCopyOnRead()
+   {
+     return this.compressor == null
+       && this.cache.isCopyOnRead()
+       && ! this.isUsedForPartitionedRegionAdmin
+       && ! this.isUsedForMetaRegion
+       && ! getOffHeap()
+       && ! isSecret();
+   }
+ 
+   /**
+    * Makes a copy, if copy-on-get is enabled, of the specified object.
+    *
+    * @since 4.0
+    */
+   protected Object conditionalCopy(Object o)
+   {
+     if (isCopyOnRead() && !Token.isInvalid(o)) {
+       return CopyHelper.copy(o);
+     }
+     else {
+       return o;
+     }
+   }
+ 
+   private final String fullPath;
+ 
+   public String getFullPath()
+   {
+     return this.fullPath;
+   }
+ 
+   //   public String getFullPath() {
+   //     // work way up to root region, prepending
+   //     // the region names to a buffer
+   //     StringBuffer buf = new StringBuffer(SEPARATOR);
+   //     Assert.assertTrue(this.regionName != null);
+   //     buf.append(this.regionName);
+   //     LocalRegion r = this;
+   //     while ((r = r.parentRegion) != null) {
+   //       buf.insert(0, r.regionName);
+   //       buf.insert(0, SEPARATOR_CHAR);
+   //     }
+   //     return buf.toString();
+   //   }
+ 
+   public Region getParentRegion()
+   {
+     //checkReadiness();
+     return this.parentRegion;
+   }
+ 
+   public Region getSubregion(String path)
+   {
+     checkReadiness();
+     return getSubregion(path, false);
+   }
+ 
+   public void invalidateRegion(Object aCallbackArgument)
+       throws TimeoutException
+   {
+     getDataView().checkSupportsRegionInvalidate();
+     validateCallbackArg(aCallbackArgument);
+     checkReadiness();
+     checkForLimitedOrNoAccess();
+     RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_INVALIDATE,
+       aCallbackArgument, false, getMyId(), generateEventID());
+ 
+     basicInvalidateRegion(event);
+   }
+ 
+   public Object put(Object key, Object value, Object aCallbackArgument)
+       throws TimeoutException, CacheWriterException {
+     long startPut = CachePerfStats.getStatTime();
+     EntryEventImpl event = newUpdateEntryEvent(key, value, aCallbackArgument);
+      //Since Sqlfire directly calls validatedPut, the freeing is done in
+     // validatedPut
+      return validatedPut(event, startPut);
+      // TODO OFFHEAP: validatedPut calls freeOffHeapResources
+     
+   }
+ 
+   public final Object validatedPut(EntryEventImpl event, long startPut)
+       throws TimeoutException, CacheWriterException {
+ 
+     try {
+       if (event.getEventId() == null && generateEventID()) {
+         event.setNewEventId(cache.getDistributedSystem());
+       }
+       Object oldValue = null;
+       // Sqlf changes begin
+       // see #40294.
+ 
+       // Rahul: this has to be an update.
+       // so executing it as an update.
+       boolean forceUpdateForDelta = event.hasDelta();
+       // Sqlf Changes end.
+       if (basicPut(event, false, // ifNew
+           forceUpdateForDelta, // ifOld
+           null, // expectedOldValue
+           false // requireOldValue
+       )) {
+         if (!event.isOldValueOffHeap()) {
+           // don't copy it to heap just to return from put.
+           // TODO: come up with a better way to do this.
+           oldValue = event.getOldValue();
+         }
+         if (!getDataView().isDeferredStats()) {
+           getCachePerfStats().endPut(startPut, false);
+         }
+       }
+       return handleNotAvailable(oldValue);
+     } finally {
+       event.release();
+     }
+   }
+ 
+   // split into a separate newUpdateEntryEvent since SQLFabric may need to
+   // manipulate event before doing the put (e.g. posDup flag)
+   public final EntryEventImpl newUpdateEntryEvent(Object key, Object value,
+       Object aCallbackArgument) {
+ 
+     validateArguments(key, value, aCallbackArgument);
+     if (value == null) {
+       throw new NullPointerException(LocalizedStrings
+           .LocalRegion_VALUE_MUST_NOT_BE_NULL.toLocalizedString());
+     }
+     checkReadiness();
+     checkForLimitedOrNoAccess();
+     discoverJTA();
+ 
+     // This used to call the constructor which took the old value. It
+     // was modified to call the other EntryEventImpl constructor so that
+     // an id will be generated by default. Null was passed in anyway.
+     //   generate EventID
+     final EntryEventImpl event = EntryEventImpl.create(
+         this, Operation.UPDATE, key,
+         value, aCallbackArgument, false, getMyId());
+     boolean eventReturned = false;
+     try {
+     extractDeltaIntoEvent(value, event);
+     eventReturned = true;
+     return event;
+     } finally {
+       if (!eventReturned) event.release();
+     }
+   }
+   /**
+    * Creates an EntryEventImpl that is optimized to not fetch data from HDFS.
+    * This is meant to be used by PUT dml from GemFireXD.
+    */
+   public final EntryEventImpl newPutEntryEvent(Object key, Object value,
+       Object aCallbackArgument) {
+     EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
+     ev.setFetchFromHDFS(false);
+     ev.setPutDML(true);
+     return ev;
+   }
+   private void extractDeltaIntoEvent(Object value, EntryEventImpl event) {
+     // 1. Check for DS-level delta property.
+     // 2. Default value for operation type is UPDATE, so no need to check that here.
+     // 3. Check if it has server region proxy. 
+     //    We do not have a handle to event in PutOpImpl to check if we have 
+     //    delta bytes calculated already. So no need to calculate it here.
+     // 4. Check if value is instanceof com.gemstone.gemfire.Delta
+     // 5. Check if Region in PR with redundantCopies > 0. Set extractDelta.
+     // 6. Check if Region has peers. Set extractDelta.
+     // 7. Check if it has any delta proxies attached to it. Set extractDelta.
+     // 8. If extractDelta is set, check if it has delta.
+     // 9. If delta is found, extract it and set it into the event.
+     // 10. If any exception is caught while invoking the delta callbacks, throw it back.
+     // 11. Wrap any checked exception in InternalGemFireException before throwing it.
+     try {
+       boolean extractDelta = false;
+       // How costly is this if check?
+       if (this.getSystem().getConfig().getDeltaPropagation()
+           && value instanceof com.gemstone.gemfire.Delta) {
+         if (!this.hasServerProxy()) {
+           if ((this instanceof PartitionedRegion)) {
+             if (((PartitionedRegion)this).getRedundantCopies() > 0) {
+               extractDelta = true;
+             } else {
+               InternalDistributedMember ids = (InternalDistributedMember)PartitionRegionHelper
+                   .getPrimaryMemberForKey(this, event.getKey());
+               if (ids != null) {
+                 if (this.getSystem().getMemberId().equals(ids.getId())) {
+                   extractDelta = hasAdjunctRecipientsNeedingDelta(event);
+                 } else {
+                   extractDelta = true;
+                 }
+               } else {
+                 extractDelta = true;
+               }
+             }
+           } else if ((this instanceof DistributedRegion)
+               && !((DistributedRegion)this).scope.isDistributedNoAck()
+               && ((DistributedRegion)this).getCacheDistributionAdvisor()
+                   .adviseCacheOp().size() > 0) {
+             extractDelta = true;
+           }
+           if (!extractDelta && ClientHealthMonitor.getInstance() != null) {
+             extractDelta = ClientHealthMonitor.getInstance().hasDeltaClients();
+           }
+         } else if (HandShake.isDeltaEnabledOnServer()) {
+           // This is a client region
+           extractDelta = true;
+         }
+         if (extractDelta && ((com.gemstone.gemfire.Delta)value).hasDelta()) {
+           HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+           long start = DistributionStats.getStatTime();
+           try {
+             ((com.gemstone.gemfire.Delta)value).toDelta(hdos);
+           } catch (RuntimeException re) {
+             throw re;
+           } catch (Exception e) {
+             throw new DeltaSerializationException(
+                 LocalizedStrings.DistributionManager_CAUGHT_EXCEPTION_WHILE_SENDING_DELTA
+                     .toLocalizedString(), e);
+           }
+           event.setDeltaBytes(hdos.toByteArray());
+           this.getCachePerfStats().endDeltaPrepared(start);
+         }
+       }
+     } catch (RuntimeException re) {
+       throw re;
+     } catch (Exception e) {
+       throw new InternalGemFireException(e);
+     }
+   }
+ 
+   @SuppressWarnings("unchecked")
+   private boolean hasAdjunctRecipientsNeedingDelta(EntryEventImpl event) {
+     PartitionedRegion pr = ((PartitionedRegion)this);
+     BucketRegion br = null;
+     FilterRoutingInfo filterRouting = null;
+     Set twoMessages = Collections.EMPTY_SET;
+     Set adjunctRecipients = Collections.EMPTY_SET;
+     Set cacheservers = null;
+ 
+     int bId = event.getKeyInfo().getBucketId();
+     try {
+       br = pr.dataStore.getInitializedBucketForId(event.getKey(), bId);
+     } catch (ForceReattemptException fre) {
+       return true;
+     }
+     Set<InternalDistributedMember> recipients = br.getCacheDistributionAdvisor().adviseUpdate(event);
+     twoMessages = br.getBucketAdvisor().adviseRequiresTwoMessages();
+     CacheDistributionAdvisor cda = pr.getCacheDistributionAdvisor();
+     filterRouting = cda.adviseFilterRouting(event, recipients);
+     adjunctRecipients = br.getAdjunctReceivers(event, recipients, twoMessages, filterRouting);
+     cacheservers = cda.adviseCacheServers();
+     return !Collections.disjoint(adjunctRecipients, cacheservers);
+   }
+ 
+   public Region.Entry getEntry(Object key)
+   {
+     validateKey(key);
+     checkReadiness();
+     checkForNoAccess();
+     discoverJTA();
+     return getDataView().getEntry(getKeyInfo(key), this, false);
+   }
+ 
+   /** internally we often need to get an entry whether it is a tombstone or not */
+   public Region.Entry getEntry(Object key, boolean allowTombstones) {
+     return getDataView().getEntry(getKeyInfo(key), this, allowTombstones);
+   }
+ 
+   /**
+    * Just like getEntry but also updates the stats that get would have depending
+    * on a flag. See bug 42410. Also skips discovering JTA
+    * 
+    * @param key
+    * @return the entry if it exists; otherwise null.
+    */
+   public Entry accessEntry(Object key, boolean updateStats) {
+     validateKey(key);
+     checkReadiness();
+     checkForNoAccess();
+     if(updateStats) {
+       return getDataView().accessEntry(getKeyInfo(key), this);
+     } else {
+       return getDataView().getEntry(getKeyInfo(key), this, false);
+     }
+   }
+ 
+   protected boolean includeHDFSResults() {
+     return isUsedForPartitionedRegionBucket() 
+         && isHDFSReadWriteRegion() 
+         && getPartitionedRegion().includeHDFSResults();
+   }
+   
+ 
+   /** a fast estimate of total number of entries locally in the region */
+   public long getEstimatedLocalSize() {
+     RegionMap rm;
+     if (!this.isDestroyed) {
+       long size;
+       if (isHDFSReadWriteRegion() && this.initialized) {
+         // this size is not used by HDFS region iterators
+         // fixes bug 49239
+         return 0;
+       }
+       // if region has not been initialized yet, then get the estimate from
+       // disk region's recovery map if available
+       if (!this.initialized && this.diskRegion != null
+           && (rm = this.diskRegion.getRecoveredEntryMap()) != null
+           && (size = rm.size()) > 0) {
+         return size;
+       }
+       if ((rm = getRegionMap()) != null) {
+         return rm.size();
+       }
+     }
+     return 0;
+   }
+     /**
+    * @param keyInfo
+    * @param access
+    *          true if caller wants last accessed time updated
+    * @param allowTombstones whether an entry with a TOMBSTONE value can be returned
+    * @return TODO
+    */
+   protected Region.Entry nonTXGetEntry(KeyInfo keyInfo, boolean access, boolean allowTombstones) {
+     final Object key = keyInfo.getKey();
+     RegionEntry re = this.entries.getEntry(key);
+     boolean miss = (re == null || re.isDestroyedOrRemoved());
+     if (access) {
+       updateStatsForGet(re, !miss);
+     }
+     if (re == null) {
+       return null;
+     }
+     if (re.isTombstone()) {
+       if (!allowTombstones) {
+         return null;
+       } // else return an entry (client GII / putAll results)
+     } else if (miss) {
+       return null;
+     }
+ 
+     Region.Entry ren = new NonTXEntry(re);
+     //long start=0, end=0;
+     //start = System.currentTimeMillis();
+     //end = System.currentTimeMillis();
+     //System.out.println("getEntry: " + (end-start));
+     return ren;
+   }
+ 
+   /**
+    * @return boolean
+    */
+   protected boolean isClosed()
+   {
+     return this.cache.isClosed();
+   }
+ 
+   /**
+    * Returns true if this region is or has been closed or destroyed.
+    * Note that unlike {@link #isDestroyed()} this method will not
+    * return true if the cache is closing but has not yet started closing
+    * this region.
+    */
+   public boolean isThisRegionBeingClosedOrDestroyed() {
+     return this.isDestroyed;
+   }
+   
+   /** returns true if this region has been destroyed */
+   public boolean isDestroyed()
+   {
+     if (isClosed()) {
+       return true; // for bug 42328
+     }
+     boolean isTraceEnabled = logger.isTraceEnabled();
+     //    boolean result = false;
+     if (this.isDestroyed) {
+       if (isTraceEnabled) {
+         logger.trace("isDestroyed: true, this.isDestroyed: {}", getFullPath());
+       }
+       return true;
+     }
+     //    if (!isInitialized()) { // don't return true if still initializing
+     //      if (finestEnabled) {
+     //        log.finest("isDestroyed: false, not initialized: " + getFullPath());
+     //      }
+     //      return false;
+     //    }
+     // @todo we could check parents here if we want this to be more accurate,
+     // and the isDestroyed field could be made volatile as well.
+     // if (this.parentRegion != null) return this.parentRegion.isDestroyed();
+     if (isTraceEnabled) {
+       logger.trace("isDestroyed: false : {}", getFullPath());
+     }
+     return false;
+   }
+ 
+   /** a variant of subregions() that does not perform a readiness check */
+   protected Set basicSubregions(boolean recursive) {
+     return new SubregionsSet(recursive);
+   }
+   
+   public Set subregions(boolean recursive) {
+     checkReadiness();
+     return new SubregionsSet(recursive);
+   }
+ 
+   public Set entries(boolean recursive) {
+     checkReadiness();
+     checkForNoAccess();
+     return basicEntries(recursive);
+   }
+ 
+   /** Returns set of entries without performing validation checks. */
+   public Set basicEntries(boolean recursive) {
+     return new EntriesSet(this, recursive, IteratorType.ENTRIES, false);
+   }
+ 
+   /**
+    * Flavor of keys that will not do repeatable read
+    * @since 5.5
+    */
+   public Set testHookKeys()
+   {
+     checkReadiness();
+     checkForNoAccess();
+     return new EntriesSet(this, false, IteratorType.KEYS,
+         false /* dontRememberReads */, false /* skipTxCheckInIteration */,
+         false /* allowTombstones */);
+   }
+ 
+   public Set keys()
+   {
+     checkReadiness();
+     checkForNoAccess();
+     return new EntriesSet(this, false, IteratorType.KEYS, false);
+   }
+ 
+   /**
+    * return a set of the keys in this region
+    * @param allowTombstones whether destroyed entries should be included
+    * @return the keys
+    */
+   public Set keySet(boolean allowTombstones) {
+     checkReadiness();
+     checkForNoAccess();
+     return new EntriesSet(this, false, IteratorType.KEYS, allowTombstones);
+   }
+ 
+   public Collection values()
+   {
+     checkReadiness();
+     checkForNoAccess();
+     return new EntriesSet(this, false, IteratorType.VALUES, false);
+   }
+ 
+   public Object getUserAttribute()
+   {
+     return this.regionUserAttribute;
+   }
+ 
+   public void setUserAttribute(Object value)
+   {
+     checkReadiness();
+     this.regionUserAttribute = value;
+   }
+ 
+   public boolean containsKey(Object key)
+   {
+     checkReadiness();
+     checkForNoAccess();
+     return getDataView().containsKey(getKeyInfo(key), this);
+   }
+ 
+   public boolean containsTombstone(Object key)
+   {
+     checkReadiness();
+     checkForNoAccess();
+     if (!this.concurrencyChecksEnabled) {
+       return false;
+     } else {
+       try {
+         Entry entry = getDataView().getEntry(getKeyInfo(key), this, true);
+         if (entry == null) {
+           return false;
+         } else {
+           return (entry.getValue() == Token.TOMBSTONE);
+         }
+       } catch (EntryDestroyedException e) {
+         return true;
+       }
+     }
+   }
+ 
+   protected boolean nonTXContainsKey(KeyInfo keyInfo) {
+     boolean contains = getRegionMap().containsKey(keyInfo.getKey());
+     if (contains && this.imageState.isClient()) {
+       // fix for bug #40871 - concurrent RI causes containsKey for destroyed entry
+       // to return true
+       RegionEntry re = this.entries.getEntry(keyInfo.getKey());
+       // TODO:KIRK:OK if (re == null || Token.isRemoved(re.getValueInVM(this))) {
+       if (re == null || re.isDestroyedOrRemoved()) {
+         contains = false;
+       }
+     }
+     return contains;
+   }
+ 
+   public boolean containsValueForKey(Object key)
+   {
+     discoverJTA();
+     return getDataView().containsValueForKey(getKeyInfo(key), this);
+   }
+ 
+   /**
+    * @param keyInfo
+    * @return TODO
+    */
+   protected boolean nonTXContainsValueForKey(KeyInfo keyInfo) {
+     checkReadiness();
+     checkForNoAccess();
+     if (this.diskRegion != null) {
+       this.diskRegion.setClearCountReference();
+     }
+     try {
+       RegionEntry entry = this.entries.getEntry(keyInfo.getKey());
+       boolean result = entry != null;
+       if (result) {
+         ReferenceCountHelper.skipRefCountTracking();
+         Object val = entry.getTransformedValue(); // no need to decompress since we only want to know if we have an existing value 
+         if (val instanceof StoredObject) {
+           OffHeapHelper.release(val);
+           ReferenceCountHelper.unskipRefCountTracking();
+           return true;
+         }
+         ReferenceCountHelper.unskipRefCountTracking();
+         // No need to to check CachedDeserializable because of Bruce's fix in r30960 for bug 42162. See bug 42732.
+         // this works because INVALID and LOCAL_INVALID will never be faulted out of mem
+         // If val is NOT_AVAILABLE that means we have a valid value on disk.
+         result = !Token.isInvalidOrRemoved(val);
+       }
+       return result;
+     }
+     finally {
+       if (this.diskRegion != null) {
+         this.diskRegion.removeClearCountReference();
+       }
+     }
+   }
+ 
+   public RegionAttributes getAttributes()
+   {
+     // to fix bug 35134 allow attribute access on closed regions
+     //checkReadiness();
+     return this;
+   }
+ 
+   public String getName()
+   {
+     return this.regionName;
+   }
+ 
+   /**
+    * Convenience method to get region name for logging/exception messages.
+    * if this region is an instanceof bucket region, it returns the
+    * bucket region name
+    * @return name of the region or the owning partitioned region
+    */
+   public String getDisplayName() {
+     if (this.isUsedForPartitionedRegionBucket()) {
+       r

<TRUNCATED>


Mime
View raw message