geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [17/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:02:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
index 3d8a5f5..ed80e75 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRegionState.java
@@ -10,12 +10,15 @@ package com.gemstone.gemfire.internal.cache;
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.cache.*;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantReadWriteLock;
+import com.gemstone.gemfire.internal.logging.LogService;
 
 import java.util.*;
 import java.util.Map.Entry;
 
+import org.apache.logging.log4j.Logger;
+
 /** TXRegionState is the entity that tracks all the changes a transaction
  * has made to a region.
  *
@@ -26,16 +29,22 @@ import java.util.Map.Entry;
  * @see TXManagerImpl
  */
 public class TXRegionState {
+  private static final Logger logger = LogService.getLogger();
+  
   // A map of Objects (entry keys) -> TXEntryState
   private final HashMap<Object, TXEntryState> entryMods;
   // A map of Objects (entry keys) -> TXEntryUserAttrState
   private HashMap uaMods;
   private Set<InternalDistributedMember> otherMembers = null;
-  private LocalRegion region;
   private TXState txState;
+  private LocalRegion region;
   private final boolean needsRefCounts;
   private boolean cleanedUp;
-  
+  /*
+   * For Distributed Tx
+   * Created during precommit, to apply changes on secondaries/replicates from coordinator.
+   */
+  private boolean createdDuringCommit;
 
   public TXRegionState(LocalRegion r,TXState txState) 
   {
@@ -74,7 +83,15 @@ public class TXRegionState {
   }
   public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId, Object pendingValue) {
     GemFireCacheImpl cache = r.getCache();
-    TXEntryState result = cache.getTXEntryStateFactory().createEntry(re, vId, pendingValue, entryKey, this);
+    boolean isDistributed = false;
+    if (cache.getTxManager().getTXState() != null) {
+      isDistributed = cache.getTxManager().getTXState().isDistTx(); 
+    }
+    else {
+      // TXCoordinator and datanode are same
+      isDistributed = cache.getTxManager().isDistributed();
+    }
+    TXEntryState result = cache.getTXEntryStateFactory().createEntry(re, vId, pendingValue, entryKey, this, isDistributed);
     this.entryMods.put(entryKey, result);
     return result;
   }
@@ -132,6 +149,10 @@ public class TXRegionState {
     }
     return result;
   }
+  
+  TXEntryState getTXEntryState(Object key) {
+    return this.entryMods.get(key);
+  }
 
   /**
    * Fills in a set of any entries created by this transaction for the provided region.
@@ -154,13 +175,24 @@ public class TXRegionState {
     if (this.uaMods == null && this.entryMods.isEmpty()) {
       return;
     }
+    if (this.txState.logger.isDebugEnabled()) {
+      this.txState.logger.debug("TXRegionState.createLockRequest 1 "
+          + r.getClass().getSimpleName() + " region-state=" + this);
+    }
     if (r.getScope().isDistributed()) {
+      // [DISTTX] Do not take lock for RR on replicates
+      if (this.isCreatedDuringCommit()) {
+        return;
+      }
       DistributedRegion dr = (DistributedRegion)r;
       Set<InternalDistributedMember> advice = dr.getCacheDistributionAdvisor().adviseTX();
       if (!advice.isEmpty()) {
         this.otherMembers = advice; // remember for when it is time to distribute
       }
     }
+    if (this.txState.logger.isDebugEnabled()) {
+      this.txState.logger.debug("TXRegionState.createLockRequest 2");
+    }
     //Bypass D-lock for Pr TX
     boolean byPassDLock = false;
     if (r instanceof BucketRegion) {
@@ -216,6 +248,9 @@ public class TXRegionState {
   }
     
   void checkForConflicts(LocalRegion r) throws CommitConflictException {
+    if (this.isCreatedDuringCommit()) {
+      return;
+    }
     {
       Iterator it = this.entryMods.entrySet().iterator();
       while (it.hasNext()) {
@@ -308,8 +343,7 @@ public class TXRegionState {
       // passed. So do nothing.
     }
   }
-  
-  
+    
   void buildCompleteMessage(LocalRegion r, TXCommitMessage msg) {
     try {
       if (!this.entryMods.isEmpty()) {
@@ -441,4 +475,98 @@ public class TXRegionState {
     // TODO Auto-generated method stub
     return txState;
   }
-}
+
+  public void close() {
+    for (TXEntryState e: this.entryMods.values()) {
+      e.close();
+    }
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder str = new StringBuilder();
+    str.append("{").append(super.toString()).append(" ");
+    str.append(" ,entryMods=").append(this.entryMods);
+    str.append(" ,isCreatedDuringCommit=").append(this.isCreatedDuringCommit());
+    str.append("}");
+    return str.toString();
+  }
+
+  /**
+   * @return the createdDuringCommit
+   */
+  public boolean isCreatedDuringCommit() {
+    return createdDuringCommit;
+  }
+
+  /**
+   * @param createdDuringCommit
+   *          the createdDuringCommit to set
+   */
+  public void setCreatedDuringCommit(boolean createdDuringCommit) {
+    this.createdDuringCommit = createdDuringCommit;
+  }
+  
+  public boolean populateDistTxEntryStateList(
+      ArrayList<DistTxThinEntryState> entryStateList) {
+    String regionFullPath = this.getRegion().getFullPath();
+    try {
+      if (!this.entryMods.isEmpty()) {
+        // [DISTTX] TODO Sort this first
+        for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
+          Object mKey = em.getKey();
+          TXEntryState txes = em.getValue();
+          DistTxThinEntryState thinEntryState = txes.getDistTxEntryStates();
+          entryStateList.add(thinEntryState);
+          if (logger.isDebugEnabled()) {
+            logger.debug("TXRegionState.populateDistTxEntryStateList Added "
+                + thinEntryState + " for key=" + mKey + " ,op="
+                + txes.opToString() + " ,region=" + regionFullPath);
+          }
+        }
+      }
+      return true;
+    } catch (RegionDestroyedException ex) {
+      // region was destroyed out from under us; after conflict checking
+      // passed. So act as if the region destroy happened right after the
+      // commit. We act this way by doing nothing; including distribution
+      // of this region's commit data.
+    } catch (CancelException ex) {
+      // cache was closed out from under us; after conflict checking
+      // passed. So do nothing.
+    }
+    if (logger.isDebugEnabled()) {
+      logger
+          .debug("TXRegionState.populateDistTxEntryStateList Got exception for region "
+              + regionFullPath);
+    }
+    return false;
+  }
+  
+  public void setDistTxEntryStates(
+      ArrayList<DistTxThinEntryState> entryEventList) {
+    String regionFullPath = this.getRegion().getFullPath();
+    int entryModsSize = this.entryMods.size();
+    int entryEventListSize = entryEventList.size();
+    if (entryModsSize != entryEventListSize) {
+      throw new UnsupportedOperationInTransactionException(
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+              "entry size of " + entryModsSize + " for region "
+                  + regionFullPath, entryEventListSize));
+    }
+
+    int index = 0;
+    // [DISTTX] TODO Sort this first
+    for (Entry<Object, TXEntryState> em : this.entryMods.entrySet()) {
+      Object mKey = em.getKey();
+      TXEntryState txes = em.getValue();
+      DistTxThinEntryState thinEntryState = entryEventList.get(index++);
+      txes.setDistTxEntryStates(thinEntryState);
+      if (logger.isDebugEnabled()) {
+        logger.debug("TxRegionState.setDistTxEntryStates Added "
+            + thinEntryState + " for key=" + mKey + " ,op=" + txes.opToString()
+            + " ,region=" + regionFullPath);
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
index 9723e48..b760596 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
@@ -178,7 +178,7 @@ public class TXRmtEvent implements TransactionEvent
     if (r.isUsedForPartitionedRegionBucket()) {
       eventRegion = r.getPartitionedRegion();
     }
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         eventRegion, op, key, newValue,
         aCallbackArgument, // callbackArg
         true, // originRemote
@@ -221,4 +221,14 @@ public class TXRmtEvent implements TransactionEvent
   {
     return this.cache;
   }
+
+  public void freeOffHeapResources() {
+    if (this.events != null) {
+      for (EntryEventImpl e: (List<EntryEventImpl>)this.events) {
+        e.release();
+      }
+    }
+    // TODO Auto-generated method stub
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index fba5e40..2672323 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -7,6 +7,7 @@
  */
 
 package com.gemstone.gemfire.internal.cache;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -44,7 +45,7 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess;
 import com.gemstone.gemfire.distributed.TXManagerCancelledException;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
 import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -52,6 +53,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
 import com.gemstone.gemfire.internal.cache.tx.TransactionalOperation.ServerRegionOperation;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 
 /** TXState is the entity that tracks the transaction state on a per
  * thread basis, noting changes to Region entries on a per operation
@@ -64,7 +66,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
  * @see TXManagerImpl
  */
 public class TXState implements TXStateInterface {
-  private static final Logger logger = LogService.getLogger();
+  protected static final Logger logger = LogService.getLogger();
   
   // The nano-timestamp of when the transaction began
   private final long beginTime;
@@ -72,15 +74,15 @@ public class TXState implements TXStateInterface {
   final IdentityHashMap<LocalRegion, TXRegionState> regions;
   
   /** whether completion has been started */
-  private boolean completionStarted;
+  protected boolean completionStarted;
   
   /** whether the transaction has been completed and cleaned up */
-  private boolean closed = false;
+  protected boolean closed = false;
   
   /** guards the completionStarted boolean and the closed boolean */
-  private final Object completionGuard = new Object();
+  protected final Object completionGuard = new Object();
   
-  private TXLockRequest locks = null;
+  protected TXLockRequest locks = null;
   // Used for jta commit lifetime
   private long jtaLifeTime;
   /**
@@ -93,15 +95,15 @@ public class TXState implements TXStateInterface {
 
  // Internal testing hooks
   private Runnable internalAfterReservation;
-  private Runnable internalAfterConflictCheck;
-  private Runnable internalAfterApplyChanges;
-  private Runnable internalAfterReleaseLocalLocks;
+  protected Runnable internalAfterConflictCheck;
+  protected Runnable internalAfterApplyChanges;
+  protected Runnable internalAfterReleaseLocalLocks;
   Runnable internalDuringIndividualSend; // package scope allows TXCommitMessage use
   Runnable internalAfterIndividualSend; // package scope allows TXCommitMessage use
   Runnable internalDuringIndividualCommitProcess; // package scope allows TXCommitMessage use
   Runnable internalAfterIndividualCommitProcess; // package scope allows TXCommitMessage use
-  private Runnable internalAfterSend;
-  private Runnable internalBeforeSend;
+  protected Runnable internalAfterSend;
+  protected Runnable internalBeforeSend;
 
   /**
    * Used to generate eventIDs
@@ -115,11 +117,11 @@ public class TXState implements TXStateInterface {
    * Used to generate eventIDs
    */
   private long baseSequenceId;
-  private final TXStateProxy proxy;
-  private boolean firedWriter = false;
-  private final boolean onBehalfOfRemoteStub;
-  private boolean gotBucketLocks = false;
-  private TXCommitMessage commitMessage = null;
+  protected final TXStateProxy proxy;
+  protected boolean firedWriter = false;
+  protected final boolean onBehalfOfRemoteStub;
+  protected boolean gotBucketLocks = false;
+  protected TXCommitMessage commitMessage = null;
   ClientProxyMembershipID bridgeContext = null;
   /** keeps track of events, so as not to re-apply events*/
   protected Set<EventID> seenEvents = new HashSet<EventID>();
@@ -205,6 +207,12 @@ public class TXState implements TXStateInterface {
     }
   }
 
+  public void freePendingCallbacks() {
+    for (EntryEventImpl ee: getPendingCallbacks()) {
+      ee.release();
+    }
+  }
+
   public List<EntryEventImpl> getPendingCallbacks() {
     return pendingCallbacks;
   }
@@ -235,6 +243,10 @@ public class TXState implements TXStateInterface {
       }
       this.regions.put(r, result);
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("TXState writeRegion flag {} region-state {} ",
+          false, result, new Throwable());
+    }
     return result;
   }
   /* (non-Javadoc)
@@ -276,7 +288,7 @@ public class TXState implements TXStateInterface {
     return this.modSerialNum > Byte.MAX_VALUE;
   }
   
-  private void reserveAndCheck() throws CommitConflictException {
+  protected void reserveAndCheck() throws CommitConflictException {
     if (this.closed) {
       return;
     }
@@ -308,6 +320,14 @@ public class TXState implements TXStateInterface {
     return this.baseSequenceId;
   }
   
+  @Override
+  public void precommit() throws CommitConflictException,
+      UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("precommit"));
+  }
+  
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
    */
@@ -445,7 +465,7 @@ public class TXState implements TXStateInterface {
     }
   }
   
-  private void attachFilterProfileInformation(List entries) {
+  protected void attachFilterProfileInformation(List entries) {
     {
       Iterator/*<TXEntryStateWithRegionAndKey>*/ it = entries.iterator();
       while (it.hasNext()) {
@@ -457,6 +477,7 @@ public class TXState implements TXStateInterface {
              * The event must contain the bucket region
              */
             EntryEventImpl ev = (EntryEventImpl)o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
+            try {
             /*
              * The routing information is derived from the PR advisor, not the bucket advisor.
              */
@@ -464,6 +485,9 @@ public class TXState implements TXStateInterface {
             o.es.setFilterRoutingInfo(fri);
             Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, new HashSet(), fri);
             o.es.setAdjunctRecipients(set);
+            } finally {
+              ev.release();
+            }
           }
         } catch (RegionDestroyedException ex) {
           // region was destroyed out from under us; after conflict checking
@@ -526,12 +550,16 @@ public class TXState implements TXStateInterface {
    * @return a sorted list of TXEntryStateWithRegionAndKey that will be used
    *  to apply the ops on the nearside in the correct order.
    */
-  private List/*<TXEntryStateWithRegionAndKey>*/ generateEventOffsets() {
+  protected List/*<TXEntryStateWithRegionAndKey>*/ generateEventOffsets() {
     this.baseMembershipId = EventID.getMembershipId(this.proxy.getTxMgr().getDM().getSystem());
     this.baseThreadId = EventID.getThreadId();
     this.baseSequenceId = EventID.getSequenceId();
     
      List/*<TXEntryStateWithRegionAndKey>*/ entries = getSortedEntries();
+    if (logger.isDebugEnabled()) {
+      logger.debug("generateEventOffsets() entries " + entries
+          + " RegionState Map=" + this.regions);
+    }
      Iterator it = entries.iterator();
      while (it.hasNext()) {
        TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey)it.next();
@@ -567,7 +595,7 @@ public class TXState implements TXStateInterface {
   }
   
   
-  private void lockBucketRegions() throws PrimaryBucketException {
+  protected void lockBucketRegions() throws PrimaryBucketException {
     
     boolean lockingSucceeded;
     do {
@@ -577,7 +605,7 @@ public class TXState implements TXStateInterface {
       while (it.hasNext()) {
         Map.Entry<LocalRegion, TXRegionState> me = it.next();
         LocalRegion r = me.getKey();
-        if (r instanceof BucketRegion) {
+        if (r instanceof BucketRegion && (((BucketRegion)r).getBucketAdvisor().isPrimary())) {
           BucketRegion b = (BucketRegion)r;
           /*
            * Lock the primary bucket so it doesnt get rebalanced until we cleanup!
@@ -629,7 +657,7 @@ public class TXState implements TXStateInterface {
   }
   
   
-  private void cleanupNonDirtyRegions() {
+  protected void cleanupNonDirtyRegions() {
         Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
     while (it.hasNext()) {
       Map.Entry<LocalRegion, TXRegionState> me = it.next();
@@ -642,7 +670,7 @@ public class TXState implements TXStateInterface {
    * this builds a new TXCommitMessage and returns it
    * @return the new message
    */
-  private TXCommitMessage buildMessage() {
+  protected TXCommitMessage buildMessage() {
     TXCommitMessage msg = new TXCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this);
     Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
     while (it.hasNext()) {
@@ -659,7 +687,7 @@ public class TXState implements TXStateInterface {
    * this builds a new TXCommitMessage and returns it
    * @return the new message
    */
-  private TXCommitMessage buildCompleteMessage() {
+  protected TXCommitMessage buildCompleteMessage() {
     TXCommitMessage msg = new TXCommitMessage(this.proxy.getTxId(), this.proxy.getTxMgr().getDM(), this);
     Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
     while (it.hasNext()) {
@@ -675,7 +703,7 @@ public class TXState implements TXStateInterface {
   /**
    * applies this transaction to the cache.
    */
-  private void applyChanges(List/*<TXEntryStateWithRegionAndKey>*/ entries) {
+  protected void applyChanges(List/*<TXEntryStateWithRegionAndKey>*/ entries) {
     {      
       Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions.entrySet().iterator();
       while (it.hasNext()) {
@@ -717,11 +745,29 @@ public class TXState implements TXStateInterface {
     return new TXEvent(this, getCache());
   }
   
-  private void cleanup() {
+  /**
+   * Note that cleanup does more than is needed in this method.
+   * This method only needs to do stuff that is required when a
+   * Cache close is done and we have txs that are still in progress.
+   * Currently the only thing that is needed is to decrement off-heap
+   * refcounts since off-heap memory lives after a cache close.
+   */
+  @Override 
+  public void close() {
+    if (!this.closed) {
+      this.closed = true;
+      for (TXRegionState r: this.regions.values()) {
+        r.close();
+      }
+    }
+  }
+  
+  protected void cleanup() {
     try {
       this.closed = true;
       this.seenEvents.clear();
       this.seenResults.clear();
+      freePendingCallbacks();
       if (this.locks!=null) {
         final long conflictStart = CachePerfStats.getStatTime();
         this.locks.cleanup();
@@ -737,7 +783,7 @@ public class TXState implements TXStateInterface {
          * Need to unlock the primary lock for rebalancing so that rebalancing can resume.
          */
         if (gotBucketLocks) {
-          if (r instanceof BucketRegion) {
+          if (r instanceof BucketRegion && (((BucketRegion)r).getBucketAdvisor().isPrimary())) {
             try {
               ((BucketRegion)r).doUnlockForPrimary();
             } catch(RegionDestroyedException rde) {
@@ -1113,7 +1159,7 @@ public class TXState implements TXStateInterface {
 
     LocalRegion region = event.getRegion();
     if (checkResources) {
-      if (!InternalResourceManager.isLowMemoryExceptionDisabled()) {
+      if (!MemoryThresholds.isLowMemoryExceptionDisabled()) {
         region.checkIfAboveThreshold(event);
       }
     }
@@ -1292,7 +1338,7 @@ public class TXState implements TXStateInterface {
    * If this parameter is not null it must match the current value of the entry
    * or an EntryNotFoundException is thrown.
    */
-  private TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion,
+  protected TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion,
       boolean rememberRead, Object expectedOldValue, boolean createIfAbsent)
   throws EntryNotFoundException
   {
@@ -1313,7 +1359,7 @@ public class TXState implements TXStateInterface {
     if (result != null) {
       if (expectedOldValue != null) { 
         Object val = result.getNearSidePendingValue();
-        if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, val)) {
+        if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, val, localRegion)) {
           txr.cleanupNonDirtyEntries(localRegion);
           throw new EntryNotFoundException(LocalizedStrings.AbstractRegionMap_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
         }
@@ -1346,7 +1392,7 @@ public class TXState implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
-  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
     TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/);
     if (tx != null) {
       Object v = tx.getValue(keyInfo, localRegion, preferCD);
@@ -1355,7 +1401,7 @@ public class TXState implements TXStateInterface {
       }
       return v;
     } else {
-      return localRegion.getDeserializedValue(keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones);
+      return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
     }
   }
 
@@ -1363,14 +1409,16 @@ public class TXState implements TXStateInterface {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws DataLocationException {
+  @Retained
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, 
+      boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     final Object key = keyInfo.getKey();
     TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
     if (tx != null) {
       Object val = tx.getPendingValue();
       if(val==null || Token.isInvalidOrRemoved(val)) {
         val = findObject(keyInfo,localRegion, val!=Token.INVALID,
-            true, val, false, false, requestingClient, clientEvent, false);
+            true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS);
       }
       return val;
     } else {
@@ -1378,7 +1426,7 @@ public class TXState implements TXStateInterface {
       // so we should never come here
       assert localRegion instanceof PartitionedRegion;
       PartitionedRegion pr = (PartitionedRegion)localRegion;
-      return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, returnTombstones);
+      return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, returnTombstones, allowReadFromHDFS);
     }
   }
 
@@ -1414,6 +1462,7 @@ public class TXState implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getValueInVM(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
+  @Retained
   public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion,
       boolean rememberRead) {
     TXEntryState tx = txReadEntry(keyInfo, localRegion, rememberRead,true/*create txEntry is absent*/);
@@ -1456,8 +1505,8 @@ public class TXState implements TXStateInterface {
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
   public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
-      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) {
-    return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+    return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
@@ -1692,11 +1741,15 @@ public class TXState implements TXStateInterface {
 //	        final boolean requiresRegionContext = theRegion.keyRequiresRegionContext();
 	        InternalDistributedMember myId = theRegion.getDistributionManager().getDistributionManagerId();
 	        for (int i = 0; i < putallOp.putAllDataSize; ++i) {
-	          EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId,myId, i, putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, !putallOp.getBaseEvent().isGenerateCallbacks());
+	          EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion, myId,myId, i, putallOp.putAllData, false, putallOp.getBaseEvent().getContext(), false, !putallOp.getBaseEvent().isGenerateCallbacks(), false);
+	          try {
 	          ev.setPutAllOperation(putallOp);
 	          if (theRegion.basicPut(ev, false, false, null, false)) {
 	            successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null);
 	          }
+	          } finally {
+	            ev.release();
+	          }
 	        }
 	      }
 	    }, putallOp.getBaseEvent().getEventId());
@@ -1749,4 +1802,29 @@ public class TXState implements TXStateInterface {
       throws EntryNotFoundException {
     // Do nothing. Not applicable for transactions.    
   }
+  
+  @Override
+  public boolean isTxState() {
+    return true;
+  }
+  
+  @Override
+  public boolean isTxStateStub() {
+    return false;
+  }
+  
+  @Override
+  public boolean isTxStateProxy() {
+    return false;
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return false;
+  }
+  
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
index b117034..feff206 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
@@ -19,6 +19,7 @@ import javax.transaction.Synchronization;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CommitConflictException;
 import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 import com.gemstone.gemfire.cache.Region.Entry;
 import com.gemstone.gemfire.cache.client.internal.ServerRegionDataAccess;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
@@ -79,6 +80,11 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
    * @since 5.0
    */
   public boolean needsLargeModCount();
+  
+  /*
+   * Only applicable for Distributed transaction.
+   */
+  public void precommit() throws CommitConflictException, UnsupportedOperationInTransactionException;
 
   public void commit() throws CommitConflictException;
 
@@ -110,7 +116,7 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
    * @param updateStats TODO
    */
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
-      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones);
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult);
 
   public TXEvent getEvent();
 
@@ -177,5 +183,33 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
    * record a transactional operation for possible later replay
    */
   public void recordTXOperation(ServerRegionDataAccess region, ServerRegionOperation op, Object key, Object arguments[]);
-}
 
+  public void close();
+  
+  /*
+   * Determine if its TxState or not
+   */
+  public boolean isTxState();
+  
+  /*
+   * Determine if is TxStateStub or not
+   */
+  public boolean isTxStateStub();
+  
+  /*
+   * Determine if is TxStateProxy or not
+   */
+  public boolean isTxStateProxy();
+  
+  /*
+   * Is class related to Distributed Transaction, and not colocated transaction
+   */
+  public boolean isDistTx();
+  
+  /*
+   * Is class meant for Coordinator for Distributed Transaction
+   * 
+   * Will be true for DistTXCoordinatorInterface
+   */
+  public boolean isCreatedOnDistTxCoordinator();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
index a65e700..948d9a0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxy.java
@@ -69,7 +69,7 @@ public interface TXStateProxy extends TXStateInterface {
    * record a client-side transactional operation for possible later replay
    */
   public void recordTXOperation(ServerRegionDataAccess proxy, ServerRegionOperation op, Object key, Object[] arguments);
-
+  
   /**
    * @return the number of operations performed in this transaction
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
index d8bfee0..f57da42 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
@@ -51,12 +51,12 @@ public class TXStateProxyImpl implements TXStateProxy {
 
   private static final Logger logger = LogService.getLogger();
   
-  private static final AtomicBoolean txDistributedClientWarningIssued = new AtomicBoolean();
+  protected static final AtomicBoolean txDistributedClientWarningIssued = new AtomicBoolean();
   
   private boolean isJTA;
   private TXId txId;
-  final private TXManagerImpl txMgr;
-  private DistributedMember target;
+  final protected TXManagerImpl txMgr;
+  protected DistributedMember target;
   private boolean commitRequestedByOwner;
   private boolean isJCATransaction;
   /**
@@ -64,7 +64,7 @@ public class TXStateProxyImpl implements TXStateProxy {
    * both beforeCompletion and afterCompletion so that beforeC can obtain
    * locks for the afterC step.  This is that thread
    */
-  private volatile TXSynchronizationRunnable synchRunnable;
+  protected volatile TXSynchronizationRunnable synchRunnable;
 
   private final ReentrantLock lock = new ReentrantLock();
 
@@ -115,9 +115,9 @@ public class TXStateProxyImpl implements TXStateProxy {
     return txMgr;
   }
 
-  private volatile TXStateInterface realDeal;
-  private boolean inProgress = true;
-  private InternalDistributedMember onBehalfOfClientMember = null;
+  protected volatile TXStateInterface realDeal;
+  protected boolean inProgress = true;
+  protected InternalDistributedMember onBehalfOfClientMember = null;
 
   /**
    * This returns either the TXState for the current transaction or
@@ -225,6 +225,14 @@ public class TXStateProxyImpl implements TXStateProxy {
     }
   }
 
+  @Override
+  public void precommit() throws CommitConflictException,
+      UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("precommit"));
+  }
+  
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
    */
@@ -328,8 +336,8 @@ public class TXStateProxyImpl implements TXStateProxy {
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
-      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
-    Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false);
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+    Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, allowReadFromHDFS, retainResult);
     if (val != null) {
       // fixes bug 51057: TXStateStub  on client always returns null, so do not increment
       // the operation count it will be incremented in findObject()
@@ -587,11 +595,11 @@ public class TXStateProxyImpl implements TXStateProxy {
   public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
       boolean generateCallbacks, Object value, boolean disableCopyOnRead,
       boolean preferCD, ClientProxyMembershipID requestingClient,
-      EntryEventImpl clientEvent, boolean returnTombstones) {
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     try {
       this.operationCount++;
       Object retVal = getRealDeal(key, r).findObject(key, r, isCreate, generateCallbacks,
-          value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
+          value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
       trackBucketForTx(key);
       return retVal;
     } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
@@ -706,9 +714,9 @@ public class TXStateProxyImpl implements TXStateProxy {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     this.operationCount++;
-    return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+    return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)
@@ -720,7 +728,7 @@ public class TXStateProxyImpl implements TXStateProxy {
       throws DataLocationException {
     this.operationCount++;
     TXStateInterface tx = getRealDeal(event.getKeyInfo(), event.getLocalRegion());
-    assert (tx instanceof TXState);
+    assert (tx instanceof TXState) : tx.getClass().getSimpleName();
     return tx.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, requireOldValue, lastModified, overwriteDestroyed);
   }
   
@@ -986,4 +994,36 @@ public class TXStateProxyImpl implements TXStateProxy {
       throws EntryNotFoundException {
     // Do nothing. Not applicable for transactions.    
   }
+
+  
+  public void close() {
+    if (this.realDeal != null) {
+      this.realDeal.close();
+    }
+  }
+  
+  @Override
+  public boolean isTxState() {
+    return false;
+  }
+  
+  @Override
+  public boolean isTxStateStub() {
+    return false;
+  }
+  
+  @Override
+  public boolean isTxStateProxy() {
+    return true;
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return false;
+  }
+  
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
index a832d0d..cefc623 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
@@ -21,18 +21,13 @@ import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CommitConflictException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.RemoteTransactionException;
 import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
 import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
 import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Region.Entry;
 import com.gemstone.gemfire.cache.TransactionException;
 import com.gemstone.gemfire.cache.TransactionId;
 import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.ReliableReplyException;
 import com.gemstone.gemfire.distributed.internal.ReliableReplyProcessor21;
 import com.gemstone.gemfire.distributed.internal.ReplyException;
@@ -68,6 +63,14 @@ public abstract class TXStateStub implements TXStateInterface {
     this.internalAfterSendCommit = null;
   }
   
+  @Override
+  public void precommit() throws CommitConflictException,
+      UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("precommit"));
+  }
+  
   /**
    * Implemented in subclasses for Peer vs. Client
    */
@@ -174,6 +177,12 @@ public abstract class TXStateStub implements TXStateInterface {
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS,  boolean retainResult) {
+    // We never have a local value if we are a stub...
+    return null;
+  }
+
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
       boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
     // We never have a local value if we are a stub...
     return null;
@@ -358,8 +367,8 @@ public abstract class TXStateStub implements TXStateInterface {
    */
   public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
       boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
-      EntryEventImpl clientEvent, boolean returnTombstones) {
-    return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent);
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+    return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)
@@ -415,7 +424,7 @@ public abstract class TXStateStub implements TXStateInterface {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     throw new UnsupportedOperationException();
   }
 
@@ -526,4 +535,34 @@ public abstract class TXStateStub implements TXStateInterface {
       throws EntryNotFoundException {
     throw new UnsupportedOperationException();
   }
+  
+  @Override
+  public void close() {
+    // nothing needed
+  }
+  
+  @Override
+  public boolean isTxState() {
+    return false;
+  }
+  
+  @Override
+  public boolean isTxStateStub() {
+    return true;
+  }
+  
+  @Override
+  public boolean isTxStateProxy() {
+    return false;
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return false;
+  }
+  
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
index b2691a2..4c4eadf 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
@@ -74,6 +74,12 @@ public abstract class Token {
    */
   public static final NotAvailable NOT_AVAILABLE = new NotAvailable();
 
+  // !!! NOTICE !!!
+  // If you add a new Token to this class then add
+  // support in OffHeapRegionEntryHelper to encode that
+  // token as an address.
+  // See OffHeapRegionEntryHelper.objectToAddress.
+  
   /**
    * A token used to represent a value that is not a token.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 3ad5af0..78b3215 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -981,7 +981,7 @@ public class TombstoneService  implements ResourceListener<MemoryEvent> {
   @Override
   public void onEvent(MemoryEvent event) {
     if (event.isLocal()) {
-      if (event.getType().isEvictMore()) {
+      if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
         this.replicatedTombstoneSweeper.forceBatchExpiration();
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TransactionMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TransactionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TransactionMessage.java
index 716984c..cbd0c01 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TransactionMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TransactionMessage.java
@@ -57,4 +57,10 @@ public interface TransactionMessage {
    * e.g. <code>ManageBucketMessage</code>
    */
   public boolean canParticipateInTransaction();
+  
+  /**
+   * Messages that participate in distributed transaction return true,
+   * others return false
+   */
+  public boolean isTransactionDistributed();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
index 3de9a83..1893152 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateEntryVersionOperation.java
@@ -90,7 +90,7 @@ public class UpdateEntryVersionOperation extends DistributedCacheOperation {
         ((KeyWithRegionContext)this.key).setRegionContext(rgn);
       }
       
-      EntryEventImpl ev = new EntryEventImpl(rgn, getOperation(), this.key,
+      EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
          null /* newValue */, this.callbackArg /*callbackArg*/, true /* originRemote*/ , getSender(), false /*generateCallbacks*/);
       ev.setEventId(this.eventId);
       ev.setVersionTag(this.versionTag);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
index a4006c8..4dfa6e7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UpdateOperation.java
@@ -12,6 +12,7 @@ package com.gemstone.gemfire.internal.cache;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -29,11 +30,17 @@ import com.gemstone.gemfire.distributed.internal.ReplyException;
 import com.gemstone.gemfire.distributed.internal.ReplyMessage;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.NewValueImporter;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.internal.util.Breadcrumbs;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
 
 /**
  * Handles distribution messaging for updating an entry in a region.
@@ -85,53 +92,12 @@ public class UpdateOperation extends AbstractUpdateOperation
     m.event = ev;
     m.eventId = ev.getEventId();
     m.key = ev.getKey();
-    CachedDeserializable cd = (CachedDeserializable)ev
-        .getSerializedNewValue();
-    if (cd != null) {
-      {
-        // don't serialize here if it is not already serialized
-        Object tmp = cd.getValue();
-        if (tmp instanceof byte[]) {
-          byte[] bb = (byte[])tmp;
-          m.newValue = bb;
-          m.newValueLimit = bb.length;
-        }
-        else {
-          m.newValueObj = tmp;
-        }
-        m.deserializationPolicy = DESERIALIZATION_POLICY_LAZY;
-      }
-    }
-    else {
-      Object v = ev.getRawNewValue();
-      if (v == null) {
-        m.newValue = null;
-        m.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
-      }
-      else if (v instanceof byte[]) {
-        m.newValue = (byte[])v;
-        m.newValueLimit = m.newValue.length;
-        m.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
-      }
-      else if (ev.hasDelta()) {
-        if (ev.getCachedSerializedNewValue() != null) {
-          m.newValue = ev.getCachedSerializedNewValue();
-          m.newValueLimit = m.newValue.length;
-        } else {
-          m.newValueObj = v;
-        }
-        m.deserializationPolicy = DESERIALIZATION_POLICY_EAGER;
-      }
-      else {
-        if (ev.getCachedSerializedNewValue() != null) {
-          m.newValue = ev.getCachedSerializedNewValue();
-          m.newValueLimit = m.newValue.length;
-        } else {
-          m.newValueObj = v;
-        }
-        m.deserializationPolicy = DESERIALIZATION_POLICY_LAZY;
-      }
+    if (CachedDeserializableFactory.preferObject() || ev.hasDelta()) {
+      m.deserializationPolicy = DESERIALIZATION_POLICY_EAGER;
+    } else {
+      m.deserializationPolicy = DESERIALIZATION_POLICY_LAZY;
     }
+    ev.exportNewValue(m);
   }
 
   @Override
@@ -146,8 +112,7 @@ public class UpdateOperation extends AbstractUpdateOperation
     }
   }
 
-  public static class UpdateMessage extends AbstractUpdateMessage
-  {
+  public static class UpdateMessage extends AbstractUpdateMessage implements NewValueImporter {
 
     /**
      * Indicates if and when the new value should be deserialized on the the
@@ -163,8 +128,7 @@ public class UpdateOperation extends AbstractUpdateOperation
 
     protected byte[] newValue;
 
-    protected transient int newValueLimit; // used by toData only
-
+    @Unretained(ENTRY_EVENT_NEW_VALUE) 
     protected transient Object newValueObj;
 
     private byte[] deltaBytes;
@@ -193,7 +157,6 @@ public class UpdateOperation extends AbstractUpdateOperation
       this.key = upMsg.key;
       this.lastModified = upMsg.lastModified;
       this.newValue = upMsg.newValue;
-      this.newValueLimit = upMsg.newValueLimit;
       this.newValueObj = upMsg.newValueObj;
       this.op = upMsg.op;
       this.owner = upMsg.owner;
@@ -231,6 +194,8 @@ public class UpdateOperation extends AbstractUpdateOperation
     protected InternalCacheEvent createEvent(DistributedRegion rgn)
         throws EntryNotFoundException {
       EntryEventImpl ev = createEntryEvent(rgn);
+      boolean evReturned = false;
+      try {
       ev.setEventId(this.eventId);
       
       ev.setDeltaBytes(this.deltaBytes);
@@ -255,7 +220,13 @@ public class UpdateOperation extends AbstractUpdateOperation
       
       ev.setInhibitAllNotifications(this.inhibitAllNotifications);
       
+      evReturned = true;
       return ev;
+      } finally {
+        if (!evReturned) {
+          ev.release();
+        }
+      }
     }
     
     @Override
@@ -355,7 +326,7 @@ public class UpdateOperation extends AbstractUpdateOperation
       if (rgn.keyRequiresRegionContext()) {
         ((KeyWithRegionContext)this.key).setRegionContext(rgn);
       }
-      EntryEventImpl result = new EntryEventImpl(rgn, getOperation(), this.key,
+      EntryEventImpl result = EntryEventImpl.create(rgn, getOperation(), this.key,
           argNewValue, // oldValue,
           this.callbackArg, originRemote, getSender(), generateCallbacks);
       setOldValueInEvent(result);
@@ -439,9 +410,6 @@ public class UpdateOperation extends AbstractUpdateOperation
         else {
           this.newValue = DataSerializer.readByteArray(in);
         }
-        if (this.newValue != null) {
-          this.newValueLimit = this.newValue.length;
-        }
         if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
           this.deltaBytes = DataSerializer.readByteArray(in);
         }
@@ -486,28 +454,11 @@ public class UpdateOperation extends AbstractUpdateOperation
       if (hasDelta()) {
         DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
         this.event.getRegion().getCachePerfStats().incDeltasSent();
-      }
-      else {
-        if (this.newValueObj != null) {
-          byte[] newValueBytes = BlobHelper.serializeToBlob(this.newValueObj);
-          this.event.setCachedSerializedNewValue(newValueBytes);
-          // for eager deserialization avoid extra byte array serialization
-          if (this.deserializationPolicy ==
-              DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) {
-            out.write(newValueBytes);
-          }
-          else {
-            DataSerializer.writeByteArray(newValueBytes, out);
-          }
-        }
-        else {
-          if (this.deserializationPolicy ==
-              DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) {
-            out.write(this.newValue, 0, this.newValueLimit);
-          } else {
-            DataSerializer.writeByteArray(this.newValue, this.newValueLimit, out);
-          }
-        }
+      } else {
+          // TODO OFFHEAP MERGE: add a writeValue that will cache in the event like so:
+          //byte[] newValueBytes = BlobHelper.serializeToBlob(this.newValueObj);
+          //this.event.setCachedSerializedNewValue(newValueBytes);
+        DistributedCacheOperation.writeValue(this.deserializationPolicy, this.newValueObj, this.newValue, out);
         if ((extraFlags & HAS_DELTA_WITH_FULL_VALUE) != 0) {
           DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
         }
@@ -550,12 +501,8 @@ public class UpdateOperation extends AbstractUpdateOperation
           valueBytes = EntryEventImpl.serialize(this.newValueObj);
         }
       }
-      else if (this.newValue.length == this.newValueLimit) {
-        valueBytes = this.newValue;
-      }
       else {
-        valueBytes = new byte[this.newValueLimit];
-        System.arraycopy(this.newValue, 0, valueBytes, 0, valueBytes.length);
+        valueBytes = this.newValue;
       }
       return Collections.singletonList(new QueuedOperation(getOperation(),
           this.key, valueBytes, valueObj, this.deserializationPolicy,
@@ -579,6 +526,33 @@ public class UpdateOperation extends AbstractUpdateOperation
     public void setSendDeltaWithFullValue(boolean bool) {
       this.sendDeltaWithFullValue = bool;
     }
+    @Override
+    public boolean prefersNewSerialized() {
+      return true;
+    }
+    @Override
+    public boolean isUnretainedNewReferenceOk() {
+      return true;
+    }
+    @Override
+    public void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized) {
+      if (nv == null) {
+        this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
+        this.newValue = null;
+      } else {
+        if (!isSerialized) {
+          this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
+        }
+        this.newValueObj = nv;
+      }
+    }
+    @Override
+    public void importNewBytes(byte[] nv, boolean isSerialized) {
+      if (!isSerialized) {
+        this.deserializationPolicy = DESERIALIZATION_POLICY_NONE;
+      }
+      this.newValue = nv;
+    }
   }
 
   public static final class UpdateWithContextMessage extends UpdateMessage
@@ -598,7 +572,7 @@ public class UpdateOperation extends AbstractUpdateOperation
       if (rgn.keyRequiresRegionContext()) {
         ((KeyWithRegionContext)this.key).setRegionContext(rgn);
       }
-      EntryEventImpl ev = new EntryEventImpl(rgn, getOperation(), this.key,
+      EntryEventImpl ev = EntryEventImpl.create(rgn, getOperation(), this.key,
           argNewValue, this.callbackArg, originRemote, getSender(),
           generateCallbacks);
       ev.setContext(this.clientID);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
index 6c9f3fe..1e2850a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
@@ -59,6 +59,8 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   private boolean hasMembershipAttributes = false;
   private boolean hasSubscriptionAttributes = false;
   private boolean hasEvictionAttributes = false;
+  private boolean hasCustomEviction = false;
+
   /**
    * Whether this region has specified a disk store name
    *
@@ -104,13 +106,24 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
    */
   private boolean hasCloningEnabled = false;
   
-  /**
+  private boolean hasHDFSStoreName = false;
+  
+  private boolean hasHDFSWriteOnly = false;
+  
+/**
    * Whether this region has entry value compression.
    * 
    * @since 8.0
    */
   private boolean hasCompressor = false;
   
+  /**
+   * Whether this region has enable off-heap memory set.
+   * 
+   * @since 9.0
+   */
+  private boolean hasOffHeap = false;
+  
   public boolean hasCacheLoader()
   {
     return this.hasCacheLoader;
@@ -251,8 +264,11 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   {
     return this.hasEvictionAttributes;
   }
-  
-  
+  public boolean hasCustomEviction()
+  {
+    return this.hasCustomEviction;
+  }
+
   public boolean hasPoolName()
   {
     return this.hasPoolName;
@@ -262,6 +278,10 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
     return this.hasCompressor;
   }
   
+  public boolean hasOffHeap() {
+    return this.hasOffHeap;
+  }
+  
   public boolean hasCloningEnabled()
   {
     return this.hasCloningEnabled;
@@ -344,6 +364,10 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   {
     this.hasEvictionAttributes = hasEvictionAttributes;
   }
+  public void setHasCustomEviction(boolean hasCustomEviction)
+  {
+    this.hasCustomEviction = hasCustomEviction;
+  }
   public void setHasIgnoreJTA(boolean hasIgnoreJTA)
   {
     this.hasIgnoreJTA = hasIgnoreJTA;
@@ -432,6 +456,10 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
     this.hasCompressor = hasCompressor;
   }
   
+  public void setHasOffHeap(boolean hasOffHeap) {
+    this.hasOffHeap = hasOffHeap;
+  }
+  
   public void setAllHasFields(boolean b) {
     int hasCounter = 0;
     Field thisFields[] = UserSpecifiedRegionAttributes.class.getDeclaredFields();
@@ -490,7 +518,7 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   {
     this.hasDiskSynchronous = val;
   }
-  private static final int HAS_COUNT = 39;
+  private static final int HAS_COUNT = 43;
   
   public void initHasFields(UserSpecifiedRegionAttributes<K,V> other)
   {
@@ -566,4 +594,22 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   public List getIndexes() {
     return this.indexes;
   }
+
+  public boolean hasHDFSStoreName()
+  {
+    return this.hasHDFSStoreName;
+  }
+  public void setHasHDFSStoreName(boolean val)
+  {
+    this.hasHDFSStoreName = val;
+  }
+  
+  public void setHasHDFSWriteOnly(boolean val)
+  {
+    this.hasHDFSWriteOnly = val;
+  }
+  public boolean hasHDFSWriteOnly()
+  {
+    return this.hasHDFSWriteOnly;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeap.java
index 2dbb09f..cb4ed31 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeap.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeap.java
@@ -52,5 +52,9 @@ public abstract class VMStatsDiskLRURegionEntryHeap extends VMStatsDiskLRURegion
     public RegionEntryFactory makeVersioned() {
       return VersionedStatsDiskLRURegionEntryHeap.getEntryFactory();
     }
+	@Override
+    public RegionEntryFactory makeOnHeap() {
+      return this;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapIntKey.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapIntKey.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapIntKey.java
index 5880883..591cd21 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapIntKey.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapIntKey.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -34,7 +35,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapIntKey extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapIntKey (RegionEntryContext context, int key, Object value
+  public VMStatsDiskLRURegionEntryHeapIntKey (RegionEntryContext context, int key,
+      Object value
       ) {
     super(context,
           (value instanceof RecoveredEntry ? null : value)
@@ -52,10 +54,12 @@ public class VMStatsDiskLRURegionEntryHeapIntKey extends VMStatsDiskLRURegionEnt
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapIntKey> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapIntKey.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -186,8 +190,6 @@ public class VMStatsDiskLRURegionEntryHeapIntKey extends VMStatsDiskLRURegionEnt
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapLongKey.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapLongKey.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapLongKey.java
index 15a93d2..18360cb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapLongKey.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapLongKey.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -34,7 +35,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapLongKey extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapLongKey (RegionEntryContext context, long key, Object value
+  public VMStatsDiskLRURegionEntryHeapLongKey (RegionEntryContext context, long key,
+      Object value
       ) {
     super(context,
           (value instanceof RecoveredEntry ? null : value)
@@ -52,10 +54,12 @@ public class VMStatsDiskLRURegionEntryHeapLongKey extends VMStatsDiskLRURegionEn
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapLongKey> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapLongKey.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -186,8 +190,6 @@ public class VMStatsDiskLRURegionEntryHeapLongKey extends VMStatsDiskLRURegionEn
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapObjectKey.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapObjectKey.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapObjectKey.java
index 02a150f..be4e866 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapObjectKey.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapObjectKey.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -34,7 +35,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapObjectKey extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapObjectKey (RegionEntryContext context, Object key, Object value
+  public VMStatsDiskLRURegionEntryHeapObjectKey (RegionEntryContext context, Object key,
+      Object value
       ) {
     super(context,
           (value instanceof RecoveredEntry ? null : value)
@@ -52,10 +54,12 @@ public class VMStatsDiskLRURegionEntryHeapObjectKey extends VMStatsDiskLRURegion
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapObjectKey> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapObjectKey.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -186,8 +190,6 @@ public class VMStatsDiskLRURegionEntryHeapObjectKey extends VMStatsDiskLRURegion
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey1.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey1.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey1.java
index b2ae8a5..922b585 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey1.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey1.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -34,7 +35,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapStringKey1 extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapStringKey1 (RegionEntryContext context, String key, Object value
+  public VMStatsDiskLRURegionEntryHeapStringKey1 (RegionEntryContext context, String key,
+      Object value
       , boolean byteEncode
       ) {
     super(context,
@@ -69,10 +71,12 @@ public class VMStatsDiskLRURegionEntryHeapStringKey1 extends VMStatsDiskLRURegio
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapStringKey1> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapStringKey1.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -203,8 +207,6 @@ public class VMStatsDiskLRURegionEntryHeapStringKey1 extends VMStatsDiskLRURegio
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey2.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey2.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey2.java
index 48e08f4..5897b0a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey2.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapStringKey2.java
@@ -20,6 +20,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -34,7 +35,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapStringKey2 extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapStringKey2 (RegionEntryContext context, String key, Object value
+  public VMStatsDiskLRURegionEntryHeapStringKey2 (RegionEntryContext context, String key,
+      Object value
       , boolean byteEncode
       ) {
     super(context,
@@ -81,10 +83,12 @@ public class VMStatsDiskLRURegionEntryHeapStringKey2 extends VMStatsDiskLRURegio
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapStringKey2> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapStringKey2.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -215,8 +219,6 @@ public class VMStatsDiskLRURegionEntryHeapStringKey2 extends VMStatsDiskLRURegio
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapUUIDKey.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapUUIDKey.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapUUIDKey.java
index 56e9b2a..bf4307e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapUUIDKey.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryHeapUUIDKey.java
@@ -21,6 +21,7 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
 // lru: LRU
 // stats: STATS
 // versioned: VERSIONED
+// offheap: OFFHEAP
 // One of the following key macros must be defined:
 // key object: KEY_OBJECT
 // key int: KEY_INT
@@ -35,7 +36,8 @@ import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMa
  * that contains your build.xml.
  */
 public class VMStatsDiskLRURegionEntryHeapUUIDKey extends VMStatsDiskLRURegionEntryHeap {
-  public VMStatsDiskLRURegionEntryHeapUUIDKey (RegionEntryContext context, UUID key, Object value
+  public VMStatsDiskLRURegionEntryHeapUUIDKey (RegionEntryContext context, UUID key,
+      Object value
       ) {
     super(context,
           (value instanceof RecoveredEntry ? null : value)
@@ -54,10 +56,12 @@ public class VMStatsDiskLRURegionEntryHeapUUIDKey extends VMStatsDiskLRURegionEn
   private static final AtomicLongFieldUpdater<VMStatsDiskLRURegionEntryHeapUUIDKey> lastModifiedUpdater
     = AtomicLongFieldUpdater.newUpdater(VMStatsDiskLRURegionEntryHeapUUIDKey.class, "lastModified");
   private volatile Object value;
-  protected final Object areGetValue() {
+  @Override
+  protected final Object getValueField() {
     return this.value;
   }
-  protected void areSetValue(Object v) {
+  @Override
+  protected void setValueField(Object v) {
     this.value = v;
   }
   protected long getlastModifiedField() {
@@ -188,8 +192,6 @@ public class VMStatsDiskLRURegionEntryHeapUUIDKey extends VMStatsDiskLRURegionEn
                                                 Object value) {
     int oldSize = getEntrySize();
     int newSize = capacityController.entrySize( getKeyForSizing(), value);
-  //   GemFireCacheImpl.getInstance().getLoggerI18n().info("DEBUG updateEntrySize: oldSize=" + oldSize
-  //                                               + " newSize=" + newSize);
     setEntrySize(newSize);
     int delta = newSize - oldSize;
   //   if ( debug ) log( "updateEntrySize key=" + getKey()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryOffHeap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryOffHeap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryOffHeap.java
new file mode 100644
index 0000000..c707472
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/VMStatsDiskLRURegionEntryOffHeap.java
@@ -0,0 +1,60 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.UUID;
+
+public abstract class VMStatsDiskLRURegionEntryOffHeap extends VMStatsDiskLRURegionEntry implements OffHeapRegionEntry {
+  public VMStatsDiskLRURegionEntryOffHeap(RegionEntryContext context, Object value) {
+    super(context, value);
+  }
+  private static final VMStatsDiskLRURegionEntryOffHeapFactory factory = new VMStatsDiskLRURegionEntryOffHeapFactory();
+  
+  public static RegionEntryFactory getEntryFactory() {
+    return factory;
+  }
+  private static class VMStatsDiskLRURegionEntryOffHeapFactory implements RegionEntryFactory {
+    public final RegionEntry createEntry(RegionEntryContext context, Object key, Object value) {
+      if (InlineKeyHelper.INLINE_REGION_KEYS) {
+        Class<?> keyClass = key.getClass();
+        if (keyClass == Integer.class) {
+          return new VMStatsDiskLRURegionEntryOffHeapIntKey(context, (Integer)key, value);
+        } else if (keyClass == Long.class) {
+          return new VMStatsDiskLRURegionEntryOffHeapLongKey(context, (Long)key, value);
+        } else if (keyClass == String.class) {
+          final String skey = (String) key;
+          final Boolean info = InlineKeyHelper.canStringBeInlineEncoded(skey);
+          if (info != null) {
+            final boolean byteEncoded = info;
+            if (skey.length() <= InlineKeyHelper.getMaxInlineStringKey(1, byteEncoded)) {
+              return new VMStatsDiskLRURegionEntryOffHeapStringKey1(context, skey, value, byteEncoded);
+            } else {
+              return new VMStatsDiskLRURegionEntryOffHeapStringKey2(context, skey, value, byteEncoded);
+            }
+          }
+        } else if (keyClass == UUID.class) {
+          return new VMStatsDiskLRURegionEntryOffHeapUUIDKey(context, (UUID)key, value);
+        }
+      }
+      return new VMStatsDiskLRURegionEntryOffHeapObjectKey(context, key, value);
+    }
+
+    public final Class getEntryClass() {
+      // The class returned from this method is used to estimate the memory size.
+      // TODO OFFHEAP: This estimate will not take into account the memory saved by inlining the keys.
+      return VMStatsDiskLRURegionEntryOffHeapObjectKey.class;
+    }
+    public RegionEntryFactory makeVersioned() {
+      return VersionedStatsDiskLRURegionEntryOffHeap.getEntryFactory();
+    }
+	@Override
+    public RegionEntryFactory makeOnHeap() {
+      return VMStatsDiskLRURegionEntryHeap.getEntryFactory();
+    }
+  }
+}


Mime
View raw message