geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [18/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchEntryMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchEntryMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchEntryMessage.java
index 853052c..d6edb3f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchEntryMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchEntryMessage.java
@@ -127,7 +127,7 @@ public final class RemoteFetchEntryMessage extends RemoteOperationMessage
         final KeyInfo keyInfo = r.getKeyInfo(key);
         Region.Entry re = r.getDataView().getEntry(keyInfo, r, true);
         if(re==null) {
-          throw new EntryNotFoundException(key.toString());
+          r.checkEntryNotFound(key);
         }
         NonLocalRegionEntry nlre = new NonLocalRegionEntry(re, r);
         LocalRegion dataReg = r.getDataRegionForRead(keyInfo);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchVersionMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchVersionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchVersionMessage.java
index a4a5ba4..5c5904c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchVersionMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteFetchVersionMessage.java
@@ -123,7 +123,7 @@ public final class RemoteFetchVersionMessage extends RemoteOperationMessage {
         if (logger.isTraceEnabled(LogMarker.DM)) {
           logger.trace(LogMarker.DM,"RemoteFetchVersionMessage did not find entry for key:{}", key);
         }
-        throw new EntryNotFoundException(key.toString());
+        r.checkEntryNotFound(key);
       }
       tag = re.getVersionStamp().asVersionTag();
       if (logger.isTraceEnabled(LogMarker.DM)) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
index 488daf1..fcd79a5 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 
 /**
@@ -106,13 +107,26 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl
     }
 
     RawValue valueBytes;
+    Object val = null;
       try {
         if (r.keyRequiresRegionContext()) {
           ((KeyWithRegionContext)this.key).setRegionContext(r);
         }
         KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
-        Object val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false);
+        val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false, false/*for replicate regions*/);
         valueBytes = val instanceof RawValue ? (RawValue)val : new RawValue(val);
+
+        if (logger.isTraceEnabled(LogMarker.DM)) {
+          logger.trace(LogMarker.DM, "GetMessage sending serialized value {} back via GetReplyMessage using processorId: {}",
+                       valueBytes, getProcessorId());
+        }
+      
+        //      r.getPrStats().endPartitionMessagesProcessing(startTime); 
+        GetReplyMessage.send(getSender(), getProcessorId(), valueBytes, getReplySender(dm));
+
+        // Unless there was an exception thrown, this message handles sending the
+        // response
+        return false;
       } 
       catch(DistributedSystemDisconnectedException sde) {
         sendReply(getSender(), this.processorId, dm, new ReplyException(new RemoteOperationException(LocalizedStrings.GetMessage_OPERATION_GOT_INTERRUPTED_DUE_TO_SHUTDOWN_IN_PROGRESS_ON_REMOTE_VM.toLocalizedString(), sde)), r, startTime);
@@ -125,19 +139,10 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl
       catch (DataLocationException e) {
         sendReply(getSender(), getProcessorId(), dm, new ReplyException(e), r, startTime);
         return false;
+      }finally {
+        OffHeapHelper.release(val);
       }
 
-      if (logger.isTraceEnabled(LogMarker.DM)) {
-        logger.trace(LogMarker.DM, "GetMessage sending serialized value {} back via GetReplyMessage using processorId: {}",
-            valueBytes, getProcessorId());
-      }
-      
-//      r.getPrStats().endPartitionMessagesProcessing(startTime); 
-      GetReplyMessage.send(getSender(), getProcessorId(), valueBytes, getReplySender(dm));
-
-    // Unless there was an exception thrown, this message handles sending the
-    // response
-    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
index 12cfe55..7039358 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteInvalidateMessage.java
@@ -43,6 +43,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
 
 public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
 
@@ -149,6 +151,7 @@ public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
     InvalidateResponse p = new InvalidateResponse(r.getSystem(), recipients, event.getKey());
     RemoteInvalidateMessage m = new RemoteInvalidateMessage(recipients,
         r.getFullPath(), p, event, useOriginRemote, possibleDuplicate);
+    m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
     Set failures =r.getDistributionManager().putOutgoing(m); 
     if (failures != null && failures.size() > 0 ) {
       throw new RemoteOperationException(LocalizedStrings.InvalidateMessage_FAILED_SENDING_0.toLocalizedString(m));
@@ -179,7 +182,7 @@ public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
     if (r.keyRequiresRegionContext()) {
       ((KeyWithRegionContext)key).setRegionContext(r);
     }
-    final EntryEventImpl event = new EntryEventImpl(
+    final EntryEventImpl event = EntryEventImpl.create(
         r,
         getOperation(),
         key,
@@ -189,6 +192,7 @@ public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
         eventSender,
         true/*generateCallbacks*/,
         false/*initializeId*/);
+    try {
     if (this.bridgeContext != null) {
       event.setContext(this.bridgeContext);
     }
@@ -240,6 +244,9 @@ public final class RemoteInvalidateMessage extends RemoteDestroyMessage {
       }
 
     return sendReply;
+    } finally {
+      event.release();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
index 4c06c42..ae72345 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteOperationMessage.java
@@ -36,6 +36,8 @@ import com.gemstone.gemfire.distributed.internal.ReplyMessage;
 import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -70,6 +72,9 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
   private InternalDistributedMember txMemberId = null;
 
   protected transient short flags;
+  
+  /*TODO [DISTTX] Convert into flag*/
+  protected boolean isTransactionDistributed = false;
 
   public RemoteOperationMessage() {
   }
@@ -89,6 +94,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
     if(txState!=null && txState.isMemberIdForwardingRequired()) {
       this.txMemberId = txState.getOriginatingMember();
     }
+    setIfTransactionDistributed();
   }
 
   public RemoteOperationMessage(Set recipients, String regionPath, ReplyProcessor21 processor) {
@@ -103,6 +109,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
     if(txState!=null && txState.isMemberIdForwardingRequired()) {
       this.txMemberId = txState.getOriginatingMember();
     }
+    setIfTransactionDistributed();
   }
 
   /**
@@ -114,6 +121,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
     this.processorId = other.processorId;
     this.txUniqId = other.getTXUniqId();
     this.txMemberId = other.getTXMemberId();
+    this.isTransactionDistributed = other.isTransactionDistributed;
   }
 
   /**
@@ -332,19 +340,23 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
   protected abstract boolean operateOnRegion(DistributionManager dm,
       LocalRegion r,long startTime) throws RemoteOperationException;
 
-
   /**
    * Fill out this instance of the message using the <code>DataInput</code>
    * Required to be a {@link com.gemstone.gemfire.DataSerializable}Note: must
    * be symmetric with {@link #toData(DataOutput)}in what it reads
    */
   @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException
-  {
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     super.fromData(in);
     this.flags = in.readShort();
     setFlags(this.flags, in);
     this.regionPath = DataSerializer.readString(in);
+
+    // extra field post 9.0
+    if (InternalDataSerializer.getVersionForDataStream(in).compareTo(
+        Version.GFE_90) >= 0) {
+      this.isTransactionDistributed = in.readBoolean();
+    }
   }
 
   public InternalDistributedMember getTXOriginatorClient() {
@@ -357,8 +369,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
    * {@link #fromData(DataInput)}in what it writes
    */
   @Override
-  public void toData(DataOutput out) throws IOException
-  {
+  public void toData(DataOutput out) throws IOException {
     super.toData(out);
     short flags = computeCompressedShort();
     out.writeShort(flags);
@@ -375,6 +386,12 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       DataSerializer.writeObject(this.getTXMemberId(),out);
     }
     DataSerializer.writeString(this.regionPath,out);
+
+    // extra field post 9.0
+    if (InternalDataSerializer.getVersionForDataStream(out).compareTo(
+        Version.GFE_90) >= 0) {
+      out.writeBoolean(this.isTransactionDistributed);
+    }
   }
 
   protected short computeCompressedShort() {
@@ -613,4 +630,28 @@ public abstract class RemoteOperationMessage extends DistributionMessage impleme
       super.process(msg);
     }
   }
+  
+  @Override
+  public boolean isTransactionDistributed() {
+    return this.isTransactionDistributed;
+  }
+  
+  /*
+   * For Distributed Tx
+   */
+  public void setTransactionDistributed(boolean isDistTx) {
+   this.isTransactionDistributed = isDistTx;
+  }
+  
+  /*
+   * For Distributed Tx
+   */
+  private void setIfTransactionDistributed() {
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if (cache != null) {
+      if (cache.getTxManager() != null) {
+        this.isTransactionDistributed = cache.getTxManager().isDistributed();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
index 921075f..7087b56 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutAllMessage.java
@@ -72,6 +72,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
 
   protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
   protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
+  protected static final short IS_PUT_DML = (SKIP_CALLBACKS << 1);
 
   private EventID eventId;
   
@@ -80,6 +81,8 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
   private Object callbackArg;
 
 //  private boolean useOriginRemote;
+
+  private boolean isPutDML;
   
   public void addEntry(PutAllEntryData entry) {
     this.putAllData[this.putAllDataCount++] = entry;
@@ -177,6 +180,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
     this.eventId = event.getEventId();
     this.skipCallbacks = skipCallbacks;
     this.callbackArg = event.getCallbackArgument();
+	this.isPutDML = event.isPutDML();
   }
 
   public RemotePutAllMessage() {
@@ -199,6 +203,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
     PutAllResponse p = new PutAllResponse(event.getRegion().getSystem(), recipients);
     RemotePutAllMessage msg = new RemotePutAllMessage(event, recipients, p,
         putAllData, putAllDataCount, useOriginRemote, processorType, possibleDuplicate, !event.isGenerateCallbacks());
+    msg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
     Set failures = event.getRegion().getDistributionManager().putOutgoing(msg);
     if (failures != null && failures.size() > 0) {
       throw new RemoteOperationException(LocalizedStrings.RemotePutMessage_FAILED_SENDING_0.toLocalizedString(msg));
@@ -229,6 +234,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
       this.bridgeContext = DataSerializer.readObject(in);
     }
     this.skipCallbacks = (flags & SKIP_CALLBACKS) != 0;
+    this.isPutDML = (flags & IS_PUT_DML) != 0;
     this.putAllDataCount = (int)InternalDataSerializer.readUnsignedVL(in);
     this.putAllData = new PutAllEntryData[putAllDataCount];
     if (this.putAllDataCount > 0) {
@@ -297,6 +303,7 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
     if (this.posDup) flags |= POS_DUP;
     if (this.bridgeContext != null) flags |= HAS_BRIDGE_CONTEXT;
     if (this.skipCallbacks) flags |= SKIP_CALLBACKS;
+    if (this.isPutDML) flags |= IS_PUT_DML;
     return flags;
   }
 
@@ -346,9 +353,10 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
     final DistributedRegion dr = (DistributedRegion)r;
     
     // create a base event and a DPAO for PutAllMessage distributed btw redundant buckets
-    EntryEventImpl baseEvent = new EntryEventImpl(
+    EntryEventImpl baseEvent = EntryEventImpl.create(
         r, Operation.PUTALL_CREATE,
         null, null, this.callbackArg, false, eventSender, !skipCallbacks);
+    try {
 
     baseEvent.setCausedByMessage(this);
     
@@ -358,11 +366,13 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
       baseEvent.setContext(this.bridgeContext);
     }
     baseEvent.setPossibleDuplicate(this.posDup);
+	baseEvent.setPutDML(this.isPutDML);
     if (logger.isDebugEnabled()) {
       logger.debug("RemotePutAllMessage.doLocalPutAll: eventSender is {}, baseEvent is {}, msg is {}",
           eventSender, baseEvent, this);
     }
     final DistributedPutAllOperation dpao = new DistributedPutAllOperation(baseEvent, putAllDataCount, false);
+    try {
     final VersionedObjectList versions = new VersionedObjectList(putAllDataCount, true, dr.concurrencyChecksEnabled);
     dr.syncBulkOp(new Runnable() {
       @SuppressWarnings("synthetic-access")
@@ -370,7 +380,8 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
 //        final boolean requiresRegionContext = dr.keyRequiresRegionContext();
         InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
         for (int i = 0; i < putAllDataCount; ++i) {
-          EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(r, myId, eventSender, i, putAllData, false, bridgeContext, posDup, !skipCallbacks);
+          EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(r, myId, eventSender, i, putAllData, false, bridgeContext, posDup, !skipCallbacks, isPutDML);
+          try {
           ev.setPutAllOperation(dpao);
           if (logger.isDebugEnabled()) {
             logger.debug("invoking basicPut with {}", ev);
@@ -379,6 +390,9 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
             putAllData[i].versionTag = ev.getVersionTag();
             versions.addKeyAndVersion(putAllData[i].key, ev.getVersionTag());
           }
+          } finally {
+            ev.release();
+          }
         }
       }
     }, baseEvent.getEventId());
@@ -388,6 +402,12 @@ public final class RemotePutAllMessage extends RemoteOperationMessageWithDirectR
     PutAllReplyMessage.send(getSender(), this.processorId, 
         getReplySender(r.getDistributionManager()), versions, this.putAllData, this.putAllDataCount);
     return false;
+    } finally {
+      dpao.freeOffHeapResources();
+    }
+    } finally {
+      baseEvent.release();
+    }
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
index 173a816..3f5efeb 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemotePutMessage.java
@@ -38,6 +38,8 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.NewValueImporter;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
@@ -45,19 +47,17 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.internal.util.Breadcrumbs;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
+import static com.gemstone.gemfire.internal.cache.DistributedCacheOperation.VALUE_IS_OBJECT;
 
 /**
  * A Replicate Region update message.  Meant to be sent only to
@@ -66,7 +66,7 @@ import java.util.Set;
  * @since 6.5
  */
 public final class RemotePutMessage extends RemoteOperationMessageWithDirectReply
-  {
+  implements NewValueImporter, OldValueImporter {
   private static final Logger logger = LogService.getLogger();
   
   private static final short FLAG_IFNEW = 0x1;
@@ -97,8 +97,10 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
 
   /** Used on sender side only to defer serialization until toData is called.
    */
+  @Unretained(ENTRY_EVENT_NEW_VALUE) 
   private transient Object valObj;
 
+  @Unretained(ENTRY_EVENT_OLD_VALUE) 
   private transient Object oldValObj;
 
   /** The callback arg of the operation */
@@ -241,47 +243,12 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     event.setOriginRemote(useOriginRemote);
 
     if (event.hasNewValue()) {
-      CachedDeserializable cd = (CachedDeserializable) event.getSerializedNewValue();
-      if (cd != null) {
-        {
-          this.deserializationPolicy =
-            DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
-          Object v = cd.getValue();
-          if (v instanceof byte[]) {
-            setValBytes((byte[])v);
-          }
-          else {
-            // Defer serialization until toData is called.
-            setValObj(v);
-          }
-        }
-      }
-      else {
-        Object v = event.getRawNewValue();
-        if (v instanceof byte[]) {
-          this.deserializationPolicy =
-            DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;
-          setValBytes((byte[]) v);
-        }
-        else if (event.hasDelta()) {
-          this.deserializationPolicy =
-            DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER;
-          if (event.getCachedSerializedNewValue() != null) {
-            setValBytes(event.getCachedSerializedNewValue());
-          } else {
-            setValObj(v);
-          }
-        }
-        else {
-          this.deserializationPolicy =
-            DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
-          if (event.getCachedSerializedNewValue() != null) {
-            setValBytes(event.getCachedSerializedNewValue());
-          } else {
-            setValObj(v);
-          }
-        }
+      if (CachedDeserializableFactory.preferObject() || event.hasDelta()) {
+        this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER;
+      } else {
+        this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_LAZY;
       }
+      event.exportNewValue(this);
     }
     else {
       // assert that if !event.hasNewValue, then deserialization policy is NONE
@@ -295,28 +262,7 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
 
     if (event.hasOldValue()) {
       this.hasOldValue = true;
-      CachedDeserializable cd = (CachedDeserializable) event.getSerializedOldValue();
-      if (cd != null) {
-        {
-          this.oldValueIsSerialized = true;
-          Object o = cd.getValue();
-          if (o instanceof byte[]) {
-            setOldValBytes((byte[])o);
-          } else {
-            // Defer serialization until toData is called.
-            setOldValObj(o);
-          }
-        }
-      } else {
-        Object old = event.getRawOldValue();
-        if (old instanceof byte[]) {
-          this.oldValueIsSerialized = false;
-          setOldValBytes((byte[]) old);
-        } else {
-          this.oldValueIsSerialized = true;
-          setOldValObj(old);
-        }
-      }
+      event.exportOldValue(this);
     }
 
     this.event = event;
@@ -490,7 +436,8 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
                                   processorType, possibleDuplicate);
     m.setInternalDs(r.getSystem());
     m.setSendDelta(true);
-
+    m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
+    
     processor.setRemotePutMessage(m);
 
     Set failures =r.getDistributionManager().putOutgoing(m);
@@ -544,11 +491,11 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     return this.oldValObj;
   }
 
-  private void setValObj(Object o) {
+  private void setValObj(@Unretained(ENTRY_EVENT_NEW_VALUE) Object o) {
     this.valObj = o;
   }
 
-  private void setOldValObj(Object o){
+  private void setOldValObj(@Unretained(ENTRY_EVENT_OLD_VALUE) Object o){
     this.oldValObj = o;
   }
 
@@ -679,26 +626,10 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     // this will be on wire for cqs old value generations.
     if (this.hasOldValue) {
       out.writeByte(this.oldValueIsSerialized ? 1 : 0);
-      if (this.oldValueIsSerialized) {
-        DataSerializer.writeObjectAsByteArray(getOldValObj(), out);
-      }
-      else {
-        DataSerializer.writeByteArray(getOldValueBytes(), out);
-      }
-    }
-    byte[] newValBytes = null;
-    if (this.valObj != null) {
-      newValBytes = BlobHelper.serializeToBlob(this.valObj);
-      this.event.setCachedSerializedNewValue(newValBytes);
-    }
-    else {
-      newValBytes = getValBytes();
-    }
-    if (this.deserializationPolicy == DistributedCacheOperation.DESERIALIZATION_POLICY_EAGER) {
-      out.write(newValBytes);
-    } else {
-      DataSerializer.writeByteArray(newValBytes, out);
+      byte policy = DistributedCacheOperation.valueIsToDeserializationPolicy(oldValueIsSerialized);
+      DistributedCacheOperation.writeValue(policy, getOldValObj(), getOldValueBytes(), out);
     }
+    DistributedCacheOperation.writeValue(this.deserializationPolicy, this.valObj, getValBytes(), out);
     if (this.event.getDeltaBytes() != null) {
       DataSerializer.writeByteArray(this.event.getDeltaBytes(), out);
     }
@@ -744,7 +675,7 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     if (r.keyRequiresRegionContext()) {
       ((KeyWithRegionContext)this.key).setRegionContext(r);
     }
-    this.event = new EntryEventImpl(
+    this.event = EntryEventImpl.create(
         r,
         getOperation(),
         getKey(),
@@ -754,6 +685,7 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
         eventSender,
         true/*generateCallbacks*/,
         false/*initializeId*/);
+    try {
     if (this.versionTag != null) {
       this.versionTag.replaceNullIDs(getSender());
       event.setVersionTag(this.versionTag);
@@ -830,23 +762,18 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     setOperation(event.getOperation()); // set operation for reply message
 
     if (sendReply) {
-      Object oldValue = null;
-      if (this.requireOldValue) {
-        oldValue = event.getSerializedOldValue();
-        if (oldValue == null) {
-          oldValue = event.getRawOldValue();
-        }
-      }
       sendReply(getSender(),
                 getProcessorId(),
                 dm,
                 null,
                 r,
                 startTime,
-                oldValue,
                 event);
     }
     return false;
+    } finally {
+      this.event.release(); // OFFHEAP this may be too soon to make this call
+    }
   }
 
 
@@ -856,7 +783,6 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
                            ReplyException ex,
                            LocalRegion pr,
                            long startTime,
-                           Object oldValue,
                            EntryEventImpl event) {
     Collection distributedTo = null;
     if (this.processorId != 0) {
@@ -865,9 +791,17 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
       // membership view than this VM
       distributedTo = this.cacheOpRecipients;
     }
-    PutReplyMessage.send(member, procId, getReplySender(dm), result, getOperation(), ex, oldValue, event);
+    PutReplyMessage.send(member, procId, getReplySender(dm), result,
+        getOperation(), ex, this, event);
   }
 
+  // override reply message type from PartitionMessage
+  @Override
+  protected void sendReply(InternalDistributedMember member, int procId, DM dm,
+      ReplyException ex, LocalRegion pr, long startTime) {
+    PutReplyMessage.send(member, procId, getReplySender(dm), result,
+        getOperation(), ex, this, null);
+  }
 
   @Override
   protected final void appendFields(StringBuffer buff)
@@ -921,7 +855,8 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     this.internalDs = internalDs;
   }
 
-  public static final class PutReplyMessage extends ReplyMessage {
+  public static final class PutReplyMessage extends ReplyMessage implements OldValueImporter {
+
     static final byte FLAG_RESULT = 0x01;
     static final byte FLAG_HASVERSION = 0x02;
     static final byte FLAG_PERSISTENT = 0x04;
@@ -933,9 +868,18 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
     Operation op;
 
     /**
+     * Set to true by the import methods if the oldValue
+     * is already serialized. In that case toData
+     * should just copy the bytes to the stream.
+     * In either case fromData just calls readObject.
+     */
+    private transient boolean oldValueIsSerialized;
+    
+    /**
      * Old value in serialized form: either a byte[] or CachedDeserializable,
      * or null if not set.
      */
+    @Unretained(ENTRY_EVENT_OLD_VALUE)
     Object oldValue;
     
     /**
@@ -976,9 +920,15 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
                             boolean result,
                             Operation op,
                             ReplyException ex,
-                            Object oldValue, EntryEventImpl event) {
+                            RemotePutMessage sourceMessage,
+                            EntryEventImpl event) {
       Assert.assertTrue(recipient != null, "PutReplyMessage NULL recipient");
-      PutReplyMessage m = new PutReplyMessage(processorId, result, op, ex, oldValue, event.getVersionTag());
+      PutReplyMessage m = new PutReplyMessage(processorId, result, op, ex, null, event != null ? event.getVersionTag() : null);
+      
+      if (sourceMessage.requireOldValue && event != null) {
+        event.exportOldValue(m);
+      }
+
       m.setRecipient(recipient);
       dm.putOutgoing(m);
     }
@@ -1053,7 +1003,12 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
       if (this.versionTag instanceof DiskVersionTag) flags |= FLAG_PERSISTENT;
       out.writeByte(flags);
       out.writeByte(this.op.ordinal);
-      DataSerializer.writeObject(this.oldValue, out);
+      if (this.oldValueIsSerialized) {
+        byte[] oldValueBytes = (byte[]) this.oldValue;
+        out.write(oldValueBytes);
+      } else {
+        DataSerializer.writeObject(this.oldValue, out);
+      }
       if (this.versionTag != null) {
         InternalDataSerializer.invokeToData(this.versionTag, out);
       }
@@ -1072,6 +1027,37 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
       }
       return sb.toString();
     }
+
+    @Override
+    public boolean prefersOldSerialized() {
+      return true;
+    }
+
+    @Override
+    public boolean isUnretainedOldReferenceOk() {
+      return true;
+    }
+    
+    @Override
+    public boolean isCachedDeserializableValueOk() {
+      return true;
+    }
+
+    @Override
+    public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) {
+      // isSerialized does not matter.
+      // toData will just call writeObject
+      // and fromData will just call readObject
+      this.oldValue = ov;
+    }
+
+    @Override
+    public void importOldBytes(byte[] ov, boolean isSerialized) {
+      if (isSerialized) {
+        this.oldValueIsSerialized = true;
+      }
+      this.oldValue = ov;
+    }
   }
 
   /**
@@ -1156,4 +1142,72 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
   public void setSendDelta(boolean sendDelta) {
     this.sendDelta = sendDelta;
   }
+
+  @Override
+  public boolean prefersNewSerialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isUnretainedNewReferenceOk() {
+    return true;
+  }
+
+  private void setDeserializationPolicy(boolean isSerialized) {
+    if (!isSerialized) {
+      this.deserializationPolicy = DistributedCacheOperation.DESERIALIZATION_POLICY_NONE;
+    }
+  }
+
+  @Override
+  public void importNewObject(@Unretained(ENTRY_EVENT_NEW_VALUE) Object nv, boolean isSerialized) {
+    setDeserializationPolicy(isSerialized);
+    setValObj(nv);
+  }
+
+  @Override
+  public void importNewBytes(byte[] nv, boolean isSerialized) {
+    setDeserializationPolicy(isSerialized);
+    setValBytes(nv);
+  }
+
+  @Override
+  public boolean prefersOldSerialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isUnretainedOldReferenceOk() {
+    return true;
+  }
+
+  @Override
+  public boolean isCachedDeserializableValueOk() {
+    return false;
+  }
+  
+  private void setOldValueIsSerialized(boolean isSerialized) {
+    if (isSerialized) {
+      if (CachedDeserializableFactory.preferObject()) {
+        this.oldValueIsSerialized = true; //VALUE_IS_OBJECT;
+      } else {
+        // Defer serialization until toData is called.
+        this.oldValueIsSerialized = true; //VALUE_IS_SERIALIZED_OBJECT;
+      }
+    } else {
+      this.oldValueIsSerialized = false; //VALUE_IS_BYTES;
+    }
+  }
+  
+  public void importOldObject(@Unretained(ENTRY_EVENT_OLD_VALUE) Object ov, boolean isSerialized) {
+    setOldValueIsSerialized(isSerialized);
+    // Defer serialization until toData is called.
+    setOldValObj(ov);
+  }
+
+  @Override
+  public void importOldBytes(byte[] ov, boolean isSerialized) {
+    setOldValueIsSerialized(isSerialized);
+    setOldValBytes(ov);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
index 22ffb4a..a7d4f92 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteRemoveAllMessage.java
@@ -193,6 +193,7 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
     RemoveAllResponse p = new RemoveAllResponse(event.getRegion().getSystem(), recipients);
     RemoteRemoveAllMessage msg = new RemoteRemoveAllMessage(event, recipients, p,
         removeAllData, removeAllDataCount, useOriginRemote, processorType, possibleDuplicate);
+    msg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
     Set failures = event.getRegion().getDistributionManager().putOutgoing(msg);
     if (failures != null && failures.size() > 0) {
       throw new RemoteOperationException(LocalizedStrings.RemotePutMessage_FAILED_SENDING_0.toLocalizedString(msg));
@@ -330,9 +331,10 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
     final DistributedRegion dr = (DistributedRegion)r;
     
     // create a base event and a op for RemoveAllMessage distributed btw redundant buckets
-    EntryEventImpl baseEvent = new EntryEventImpl(
+    EntryEventImpl baseEvent = EntryEventImpl.create(
         r, Operation.REMOVEALL_DESTROY,
         null, null, this.callbackArg, false, eventSender, true);
+    try {
 
     baseEvent.setCausedByMessage(this);
     
@@ -347,6 +349,7 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
           eventSender, baseEvent, this);
     }
     final DistributedRemoveAllOperation op = new DistributedRemoveAllOperation(baseEvent, removeAllDataCount, false);
+    try {
     final VersionedObjectList versions = new VersionedObjectList(removeAllDataCount, true, dr.concurrencyChecksEnabled);
     dr.syncBulkOp(new Runnable() {
       @SuppressWarnings("synthetic-access")
@@ -354,6 +357,7 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
         InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
         for (int i = 0; i < removeAllDataCount; ++i) {
           EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(r, myId, eventSender, i, removeAllData, false, bridgeContext, posDup, false);
+          try {
           ev.setRemoveAllOperation(op);
           if (logger.isDebugEnabled()) {
             logger.debug("invoking basicDestroy with {}", ev);
@@ -364,6 +368,9 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
           }
           removeAllData[i].versionTag = ev.getVersionTag();
           versions.addKeyAndVersion(removeAllData[i].key, ev.getVersionTag());
+          } finally {
+            ev.release();
+          }
         }
       }
     }, baseEvent.getEventId());
@@ -373,6 +380,12 @@ public final class RemoteRemoveAllMessage extends RemoteOperationMessageWithDire
     RemoveAllReplyMessage.send(getSender(), this.processorId, 
         getReplySender(r.getDistributionManager()), versions, this.removeAllData, this.removeAllDataCount);
     return false;
+    } finally {
+      op.freeOffHeapResources();
+    }
+    } finally {
+      baseEvent.release();
+    }
   }
 
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
index 6d9a818..f7676b9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/SearchLoadAndWriteProcessor.java
@@ -65,6 +65,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 
 
 /**
@@ -839,21 +840,22 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
 
   private boolean doLocalWrite(CacheWriter writer,CacheEvent pevent,int paction)
   throws CacheWriterException {
-    CacheEvent event = getEventForListener(pevent);
     // Return if the inhibit all notifications flag is set
-    if (event instanceof EntryEventImpl) {
-      if (((EntryEventImpl)event).inhibitAllNotifications()){
+    if (pevent instanceof EntryEventImpl) {
+      if (((EntryEventImpl)pevent).inhibitAllNotifications()){
         if (logger.isDebugEnabled()) {
-          logger.debug("Notification inhibited for key {}", event);
+          logger.debug("Notification inhibited for key {}", pevent);
         }
         return false;
       }
     }
+    CacheEvent event = getEventForListener(pevent);
     
     int action = paction;
     if (event.getOperation().isCreate() && action == BEFOREUPDATE) {
       action = BEFORECREATE;
     }
+    try {
     switch(action) {
       case BEFORECREATE:
         writer.beforeCreate((EntryEvent)event);
@@ -874,6 +876,13 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
         break;
 
     }
+    } finally {
+      if (event != pevent) {
+        if (event instanceof EntryEventImpl) {
+          ((EntryEventImpl) event).release();
+        }
+      }
+    }
     this.localWrite = true;
     return true;
 
@@ -1920,7 +1929,11 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
                       long lastModified = entry.getLastModified();
                       lastModifiedCacheTime = lastModified;
                       if (eov instanceof CachedDeserializable) {
-                        {
+                        if (eov instanceof StoredObject && !((StoredObject) eov).isSerialized()) {
+                          isSer = false;
+                          ebv = (byte[]) ((StoredObject)eov).getDeserializedForReading();
+                          ebvLen = ebv.length;
+                        } else {
                           // don't serialize here if it is not already serialized
                           Object tmp = ((CachedDeserializable)eov).getValue();
                           if (tmp instanceof byte[]) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ServerPingMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ServerPingMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ServerPingMessage.java
new file mode 100644
index 0000000..5f98ea2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ServerPingMessage.java
@@ -0,0 +1,120 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Ping to check if a server is alive. It waits for a specified 
+ * time before returning false. 
+ * 
+ * @author hemantb
+ */
+public class ServerPingMessage extends PooledDistributionMessage {
+  private int processorId = 0;
+  
+  public ServerPingMessage() {
+  }
+
+  
+  public ServerPingMessage(ReplyProcessor21 processor) {
+    this.processorId = processor.getProcessorId();
+  }
+
+  
+  @Override
+  public int getDSFID() {
+    return SERVER_PING_MESSAGE;
+  }
+
+  /**
+   * Sends a ping message. The pre-GFXD_101 recipients are filtered out 
+   * and it is assumed that they are pingable. 
+   * 
+   * @return true if all the recipients are pingable
+   */
+  public static boolean send (GemFireCacheImpl cache, 
+      Set<InternalDistributedMember> recipients) {
+    
+    InternalDistributedSystem ids = cache.getDistributedSystem();
+    DM dm = ids.getDistributionManager();
+    Set <InternalDistributedMember> filteredRecipients = new HashSet<InternalDistributedMember>();
+     
+    // filtered recipients 
+    for (InternalDistributedMember recipient : recipients) {
+      if(Version.GFE_81.compareTo(recipient.getVersionObject()) <= 0) {
+        filteredRecipients.add(recipient);
+      } 
+    }
+    if (filteredRecipients == null || filteredRecipients.size() == 0)
+      return true;
+   
+    ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, filteredRecipients);
+    ServerPingMessage spm = new ServerPingMessage(replyProcessor);
+   
+    spm.setRecipients(filteredRecipients);
+    Set failedServers = null;
+    try {
+      if (cache.getLoggerI18n().fineEnabled())
+        cache.getLoggerI18n().fine("Pinging following servers " +  filteredRecipients);
+      failedServers = dm.putOutgoing(spm);
+      
+      // wait for the replies for timeout msecs
+      boolean receivedReplies = replyProcessor.waitForReplies(0L);
+      
+      dm.getCancelCriterion().checkCancelInProgress(null);
+      
+      // If the reply is not received in the stipulated time, throw an exception
+      if (!receivedReplies) {
+        cache.getLoggerI18n().error(LocalizedStrings.Server_Ping_Failure, filteredRecipients);
+        return false;
+      }
+    } catch (Throwable e) {
+      cache.getLoggerI18n().error(LocalizedStrings.Server_Ping_Failure, filteredRecipients, e );
+      return false;
+    }
+   
+    if (failedServers == null  || failedServers.size() == 0)
+      return true; 
+    
+    cache.getLoggerI18n().info(LocalizedStrings.Server_Ping_Failure, failedServers);
+    
+    return false;
+  }
+  
+  @Override
+  protected void process(DistributionManager dm) {
+    // do nothing. We are just pinging the server. send the reply back. 
+    ReplyMessage.send(getSender(), this.processorId,  null, dm); 
+  }
+
+ 
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    out.writeInt(this.processorId);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.processorId = in.readInt();
+  }
+  
+  @Override
+  public int getProcessorId() {
+    return this.processorId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index f091570..2253937 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -651,6 +651,7 @@ public final class TXCommitMessage extends PooledDistributionMessage implements
       if (tls.length > 0) {
         txEvent = new TXRmtEvent(this.txIdent, cache);
       }
+      try {
       // Pre-process each Region in the tx
       try {
         {
@@ -714,6 +715,11 @@ public final class TXCommitMessage extends PooledDistributionMessage implements
       }
     } catch(CancelException e) {
       processCacheRuntimeException(e);
+    } finally {
+      if (txEvent != null) {
+        txEvent.freeOffHeapResources();
+      }
+    }
     }
     finally {
       LocalRegion.setThreadInitLevelRequirement(oldLevel);
@@ -1270,6 +1276,7 @@ public final class TXCommitMessage extends PooledDistributionMessage implements
          * This happens when we don't have the bucket and are getting adjunct notification
          */
         EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
+        try {
         if(entryOp.filterRoutingInfo!=null) {
           eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
         }
@@ -1285,6 +1292,9 @@ public final class TXCommitMessage extends PooledDistributionMessage implements
         // In the latter case we need to invoke listeners
         final boolean skipListeners = !isDuplicate;
         eei.invokeCallbacks(this.r, skipListeners, true);
+        } finally {
+          eei.release();
+        }
         return;
       }
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
index b0583d2..cc9532f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
@@ -16,6 +16,7 @@ import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.StatisticsDisabledException;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 
 /** ******************* Class Entry ***************************************** */
 
@@ -98,11 +99,12 @@ public class TXEntry implements Region.Entry
     return this.keyInfo.getKey();
   }
 
+  @Unretained
   public Object getValue()
   {
     checkTX();
 //    Object value = this.localRegion.getDeserialized(this.key, false, this.myTX, this.rememberReads);
-    Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false);
+    @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, false, false);
     if (value == null) {
       throw new EntryDestroyedException(this.keyInfo.getKey().toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
index d8e4ef4..0d44686 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryState.java
@@ -8,12 +8,17 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.TX_ENTRY_STATE;
+
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.logging.log4j.Logger;
+
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.cache.CacheRuntimeException;
 import com.gemstone.gemfire.cache.CacheWriter;
@@ -27,14 +32,22 @@ import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.delta.Delta;
-import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.lang.StringUtils;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.pdx.PdxSerializationException;
 
 /**
@@ -46,11 +59,14 @@ import com.gemstone.gemfire.pdx.PdxSerializationException;
  * @since 4.0
  *  
  */
-public class TXEntryState
+public class TXEntryState implements Releasable
   {
+  private static final Logger logger = LogService.getLogger();
+  
   /**
    * This field is final except for when it is nulled out during cleanup
    */
+  @Retained(TX_ENTRY_STATE)
   private Object originalVersionId;
   private final Object originalValue;
 
@@ -209,6 +225,17 @@ public class TXEntryState
   private VersionTag remoteVersionTag = null;
 
   /**
+   * Next region version generated on the primary
+   */
+  private long nextRegionVersion = -1;
+  
+  /*
+   * For Distributed Transaction.
+   * THis value is set when applying commit
+   */
+  private transient DistTxThinEntryState distTxThinEntryState;
+  
+  /**
    * This constructor is used to create a singleton used by LocalRegion to
    * signal that noop invalidate op has been performed. The instance returned by
    * this constructor is just a marker; it is not good for anything else.
@@ -226,7 +253,7 @@ public class TXEntryState
   /**
    * This constructor is used when creating an entry
    */
-  protected TXEntryState(RegionEntry re, Object pvId, Object pv, TXRegionState txRegionState) {
+  protected TXEntryState(RegionEntry re, Object pvId, Object pv, TXRegionState txRegionState, boolean isDistributed) {
     Object vId = pvId;
     if (vId == null) {
       vId = Token.REMOVED_PHASE1;
@@ -247,6 +274,9 @@ public class TXEntryState
       this.refCountEntry = null;
     }
     this.txRegionState = txRegionState;
+    if (isDistributed) {
+      this.distTxThinEntryState = new DistTxThinEntryState();
+    }
   }
   
   public TXRegionState getTXRegionState() {
@@ -288,6 +318,7 @@ public class TXEntryState
    * Gets the pending value for near side operations. Special cases local
    * destroy and local invalidate to fix bug 34387.
    */
+  @Unretained
   public Object getNearSidePendingValue()
   {
     if (isOpLocalDestroy()) {
@@ -412,6 +443,7 @@ public class TXEntryState
     }
   }
 
+  @Retained
   public Object getValueInVM(Object key) throws EntryNotFoundException
   {
     if (!existsLocally()) {
@@ -535,7 +567,7 @@ public class TXEntryState
 //            && this.op != OP_LOCAL_CREATE && this.op != OP_SEARCH_PUT);
 //  }
 
-  private String opToString()
+  String opToString()
   {
     return opToString(this.op);
   }
@@ -912,6 +944,9 @@ public class TXEntryState
       eventRegion = r.getPartitionedRegion();
     }
     EntryEventImpl result = new TxEntryEventImpl(eventRegion, key);
+    // OFFHEAP: freeOffHeapResources on this event is called from TXEvent.freeOffHeapResources.
+    boolean returnedResult = false;
+    try {
     if (this.destroy == DESTROY_NONE || isOpDestroy()) {
       result.setOldValue(getOriginalValue());
     }
@@ -921,7 +956,11 @@ public class TXEntryState
       result.setOriginRemote(false);
     }
     result.setTransactionId(txs.getTransactionId());
+    returnedResult = true;
     return result;
+    } finally {
+      if (!returnedResult) result.release();
+    }
   }
 
   /**
@@ -1070,6 +1109,20 @@ public class TXEntryState
       }
     }
   }
+  
+
+  /* TODO OFFHEAP MERGE: is this code needed?
+  @Retained
+  protected final Object getRetainedValueInTXOrRegion() {
+    @Unretained Object val = this.getValueInTXOrRegion();
+    if (val instanceof Chunk) {
+      if (!((Chunk) val).retain()) {
+        throw new IllegalStateException("Could not retain OffHeap value=" + val);
+      }
+    }
+    return val;
+  }
+  */
 
   /**
    * Perform operation algebra
@@ -1413,6 +1466,9 @@ public class TXEntryState
   
   private boolean areIdentical(Object o1, Object o2) {
     if (o1 == o2) return true;
+    if (o1 instanceof StoredObject) {
+      if (o1.equals(o2)) return true;
+    }
     return false;
   }
 
@@ -1433,6 +1489,7 @@ public class TXEntryState
       r.checkReadiness();
       RegionEntry re = r.basicGetEntry(key);
       Object curCmtVersionId = null;
+      try {
       if ((re == null) || re.isValueNull()) {
         curCmtVersionId = Token.REMOVED_PHASE1;
       }
@@ -1463,6 +1520,9 @@ public class TXEntryState
           throw new CommitConflictException(LocalizedStrings.TXEntryState_ENTRY_FOR_KEY_0_ON_REGION_1_HAD_ALREADY_BEEN_CHANGED_FROM_2_TO_3.toLocalizedString(new Object[] {key, r.getDisplayName(), fromString, toString}));
         }
       }
+      } finally {
+        OffHeapHelper.release(curCmtVersionId);
+      }
     }
     catch (CacheRuntimeException ex) {
       r.getCancelCriterion().checkCancelInProgress(null);
@@ -1474,6 +1534,10 @@ public class TXEntryState
   {
     Object o = obj;
     if (o instanceof CachedDeserializable) {
+      if (o instanceof StoredObject && ((StoredObject) o).isCompressed()) {
+        // fix for bug 52113
+        return "<compressed value of size " + ((StoredObject) o).getValueSizeInBytes() + ">";
+      }
       try {
         o = ((CachedDeserializable)o).getDeserializedForReading();
       } catch (PdxSerializationException e) {
@@ -1731,7 +1795,12 @@ public class TXEntryState
   
   void applyChanges(LocalRegion r, Object key, TXState txState)
   {
-    
+    if (LogService.getLogger().isDebugEnabled()) {
+      LogService.getLogger().debug(
+          "applyChanges txState=" + txState + " ,key=" + key + " ,r="
+              + r.getDisplayName() + " ,op=" + this.op + " ,isDirty="
+              + isDirty());
+    }
     if (!isDirty()) {
       // all we did was read so just return
       return;
@@ -1853,6 +1922,15 @@ public class TXEntryState
   private Operation getUpdateOperation() {
     return isBulkOp() ? Operation.PUTALL_UPDATE : Operation.UPDATE;
   }
+
+  @Override
+  @Released(TX_ENTRY_STATE)
+  public void release() {
+    Object tmp = this.originalVersionId;
+    if (OffHeapHelper.release(tmp)) {
+      this.originalVersionId = null; // fix for bug 47900
+    }
+  }
   
   private Operation getDestroyOperation() {
     if (isOpLocalDestroy()) {
@@ -1953,6 +2031,7 @@ public class TXEntryState
     if (this.refCountEntry != null) {
       r.txDecRefCount(refCountEntry);
     }
+    close();
   }
 
   /**
@@ -1977,7 +2056,8 @@ public class TXEntryState
       //TODO:ASIF :Check if the eventID should be created. Currently not
       // creating it
       super(r, getNearSideOperation(), key,
-          getNearSidePendingValue(),TXEntryState.this.getCallbackArgument(), false, r.getMyId());
+          getNearSidePendingValue(),TXEntryState.this.getCallbackArgument(), false, r.getMyId()
+          , true/* generateCallbacks */, true /*initializeId*/);
     }
 
     /**
@@ -2017,9 +2097,9 @@ public class TXEntryState
       return new TXEntryState();
     }
 
-    public TXEntryState createEntry(RegionEntry re, Object vId, Object pendingValue, Object entryKey,TXRegionState txrs)
+    public TXEntryState createEntry(RegionEntry re, Object vId, Object pendingValue, Object entryKey,TXRegionState txrs,boolean isDistributed)
     {
-      return new TXEntryState(re, vId, pendingValue, txrs);
+      return new TXEntryState(re, vId, pendingValue, txrs, isDistributed);
     }
 
   };
@@ -2064,4 +2144,113 @@ public class TXEntryState
   public void setTailKey(long tailKey) {
     this.tailKey = tailKey;
   }
-}
+
+  public void close() {
+    release();
+  }
+  
+  public void setNextRegionVersion(long v) {
+    this.nextRegionVersion = v;
+  }
+  
+  public long getNextRegionVersion() {
+    return this.nextRegionVersion;
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder str = new StringBuilder();
+    str.append("{").append(super.toString()).append(" ");
+    str.append(this.op);
+    str.append("}");
+    return str.toString();
+  }
+
+  public DistTxThinEntryState getDistTxEntryStates() {
+    return this.distTxThinEntryState;
+  }
+  
+  public void setDistTxEntryStates(DistTxThinEntryState thinEntryState) {
+    this.distTxThinEntryState = thinEntryState;
+  }
+
+  /**
+   * For Distributed Transaction Usage
+   * 
+   * This class is used to bring relevant information for DistTxEntryEvent from
+   * primary, after end of precommit. Same information are sent to all
+   * replicates during commit.
+   * 
+   * Whereas @see DistTxEntryEvent is used forstoring entry event information on
+   * TxCordinator and carry same to replicates.
+   * 
+   * @author vivekb
+   */
+  public static class DistTxThinEntryState implements DataSerializableFixedID {
+
+    private long regionVersion =1L;
+    private long tailKey = -1L;
+    private String memberID;
+
+    // For Serialization
+    public DistTxThinEntryState() {
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public int getDSFID() {
+      return DIST_TX_THIN_ENTRY_STATE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeLong(this.regionVersion, out);
+      DataSerializer.writeLong(this.tailKey, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      this.regionVersion = DataSerializer.readLong(in);
+      this.tailKey = DataSerializer.readLong(in);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder buf = new StringBuilder();
+      buf.append("DistTxThinEntryState: ");
+      buf.append(" ,regionVersion=" + this.regionVersion);
+      buf.append(" ,tailKey=" + this.tailKey);
+      buf.append(" ,memberID=" + this.memberID);
+      return buf.toString();
+    }
+
+    public long getRegionVersion() {
+      return this.regionVersion;
+    }
+
+    public void setRegionVersion(long regionVersion) {
+      this.regionVersion = regionVersion;
+    }
+
+    public long getTailKey() {
+      return this.tailKey;
+    }
+
+    public void setTailKey(long tailKey) {
+      this.tailKey = tailKey;
+    }
+
+    public String getMemberID() {
+      return memberID;
+    }
+
+    public void setMemberID(String memberID) {
+      this.memberID = memberID;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryStateFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryStateFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryStateFactory.java
index 04796f3..b105eba 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryStateFactory.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntryStateFactory.java
@@ -24,6 +24,6 @@ public interface TXEntryStateFactory
    */
   public TXEntryState createEntry();
 
-  public TXEntryState createEntry(RegionEntry re, Object vId, Object pendingValue, Object entryKey,TXRegionState txrs);
+  public TXEntryState createEntry(RegionEntry re, Object vId, Object pendingValue, Object entryKey,TXRegionState txrs,boolean isDistributed);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
index 3e4a596..e84c1e7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
@@ -9,6 +9,7 @@
 package com.gemstone.gemfire.internal.cache;
 import com.gemstone.gemfire.cache.*;
 import java.util.*;
+import com.gemstone.gemfire.internal.offheap.Releasable;
 
 /** <p>The internal implementation of the {@link TransactionEvent} interface
  * 
@@ -17,7 +18,7 @@ import java.util.*;
  * @since 4.0
  * 
  */
-public class TXEvent implements TransactionEvent {
+public class TXEvent implements TransactionEvent, Releasable {
   private final TXStateInterface localTxState;
   private List events;
   private List createEvents = null;
@@ -122,4 +123,17 @@ public class TXEvent implements TransactionEvent {
   public final Cache getCache() {
     return this.cache;
   }
+
+  @Override
+  public synchronized void release() {
+    if (this.events != null) {
+      Iterator it = getEvents().iterator();
+      while (it.hasNext()) {
+        Object o = it.next();
+        if (o instanceof EntryEventImpl) {
+          ((EntryEventImpl) o).release();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index 4e97574..88714b0 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@ -137,15 +137,21 @@ public final class TXManagerImpl implements CacheTransactionManager,
   public static boolean ALLOW_PERSISTENT_TRANSACTIONS = Boolean.getBoolean("gemfire.ALLOW_PERSISTENT_TRANSACTIONS");
 
   /**
-   * this keeps track of all the transactions that were initiated locally. Could have been
-   * a set, is a Map to allow concurrent operations.
+   * this keeps track of all the transactions that were initiated locally.
    */
-  private ConcurrentMap<TXId, Boolean> localTxMap = new ConcurrentHashMap<TXId, Boolean>();
+  private ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<TXId, TXStateProxy>();
 
   /**
    * the time in minutes after which any suspended transaction are rolled back. default is 30 minutes
    */
   private volatile long suspendedTXTimeout = Long.getLong("gemfire.suspendedTxTimeout", 30);
+  
+  /**
+   * Thread-specific flag to indicate whether the transactions managed by this
+   * CacheTransactionManager for this thread should be distributed
+   */
+  private final ThreadLocal<Boolean> isTXDistributed;
+  
 
   /** Constructor that implements the {@link CacheTransactionManager}
    * interface. Only only one instance per {@link com.gemstone.gemfire.cache.Cache}
@@ -163,6 +169,7 @@ public final class TXManagerImpl implements CacheTransactionManager,
     this.cachePerfStats = cachePerfStats;
     this.hostedTXStates = new HashMap<TXId, TXStateProxy>();
     this.txContext = new ThreadLocal<TXStateProxy>();
+    this.isTXDistributed = new ThreadLocal<Boolean>();
     currentInstance = this;
   }
 
@@ -284,8 +291,14 @@ public final class TXManagerImpl implements CacheTransactionManager,
       }
     }
     TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
-    setTXState(new TXStateProxyImpl(this, id, null));
-    this.localTxMap.put(id, Boolean.TRUE);
+    TXStateProxyImpl proxy = null;
+    if (isDistributed()) {
+      proxy = new DistTXStateProxyImplOnCoordinator(this, id, null);  
+    } else {
+      proxy = new TXStateProxyImpl(this, id, null);  
+    }
+    setTXState(proxy);
+    this.localTxMap.put(id, proxy);
   }
 
 
@@ -298,11 +311,33 @@ public final class TXManagerImpl implements CacheTransactionManager,
   public TXStateProxy beginJTA() {
     checkClosed();
     TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
-    TXStateProxy newState = new TXStateProxyImpl(this, id, true);
+    TXStateProxy newState = null;
+    
+    if (isDistributed()) {
+      newState = new DistTXStateProxyImplOnCoordinator(this, id, true);
+    } else {
+      newState = new TXStateProxyImpl(this, id, true);
+    }
     setTXState(newState);
     return newState;
   }
 
+  /*
+   * Only applicable for Distributed transaction.
+   */
+  public void precommit() throws CommitConflictException {
+    checkClosed();
+
+    final TXStateProxy tx = getTXState();
+    if (tx == null) {
+      throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
+    }
+    
+    tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_COMMIT_THIS_TRANSACTION_BECAUSE_IT_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_COMMIT.toLocalizedString());
+  
+    tx.precommit();
+  }
+  
   /** Complete the transaction associated with the current
    *  thread. When this method completes, the thread is no longer
    *  associated with a transaction.
@@ -326,9 +361,11 @@ public final class TXManagerImpl implements CacheTransactionManager,
     } catch (CommitConflictException ex) {
       saveTXStateForClientFailover(tx, TXCommitMessage.CMT_CONFLICT_MSG); //fixes #43350
       noteCommitFailure(opStart, lifeTime, tx);
+      cleanup(tx.getTransactionId()); // fixes #52086
       throw ex;
     } catch (TransactionDataRebalancedException reb) {
       saveTXStateForClientFailover(tx, TXCommitMessage.REBALANCE_MSG);
+      cleanup(tx.getTransactionId()); // fixes #52086
       throw reb;
     } catch (UnsupportedOperationInTransactionException e) {
       // fix for #42490
@@ -336,6 +373,7 @@ public final class TXManagerImpl implements CacheTransactionManager,
       throw e;
     } catch (RuntimeException e) {
       saveTXStateForClientFailover(tx, TXCommitMessage.EXCEPTION_MSG);
+      cleanup(tx.getTransactionId()); // fixes #52086
       throw e;
     }
     saveTXStateForClientFailover(tx);
@@ -348,10 +386,12 @@ public final class TXManagerImpl implements CacheTransactionManager,
     this.cachePerfStats.txFailure(opEnd - opStart,
                                   lifeTime, tx.getChanges());
     TransactionListener[] listeners = getListeners();
-    if(tx.isFireCallbacks()) {
+    if (tx.isFireCallbacks() && listeners.length > 0) {
+      final TXEvent e = tx.getEvent();
+      try {
       for (int i=0; i < listeners.length; i++) {
         try {
-          listeners[i].afterFailedCommit(tx.getEvent());
+          listeners[i].afterFailedCommit(e);
         } 
         catch (VirtualMachineError err) {
           SystemFailure.initiateFailure(err);
@@ -369,6 +409,9 @@ public final class TXManagerImpl implements CacheTransactionManager,
           logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
         }
       }
+      } finally {
+        e.release();
+      }
     }
   }
 
@@ -376,11 +419,13 @@ public final class TXManagerImpl implements CacheTransactionManager,
     long opEnd = CachePerfStats.getStatTime();
     this.cachePerfStats.txSuccess(opEnd - opStart,
                                   lifeTime, tx.getChanges());
-    if(tx.isFireCallbacks()) {
-      TransactionListener[] listeners = getListeners();
-      for (int i=0; i < listeners.length; i++) {
+    TransactionListener[] listeners = getListeners();
+    if (tx.isFireCallbacks() && listeners.length > 0) {
+      final TXEvent e = tx.getEvent();
+      try {
+      for (final TransactionListener listener : listeners) {
         try {
-          listeners[i].afterCommit(tx.getEvent());
+          listener.afterCommit(e);
         } 
         catch (VirtualMachineError err) {
           SystemFailure.initiateFailure(err);
@@ -398,6 +443,9 @@ public final class TXManagerImpl implements CacheTransactionManager,
           logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
         }
       }
+      } finally {
+        e.release();
+      }
     }
   }
   
@@ -439,10 +487,12 @@ public final class TXManagerImpl implements CacheTransactionManager,
     this.cachePerfStats.txRollback(opEnd - opStart,
                                    lifeTime, tx.getChanges());
     TransactionListener[] listeners = getListeners();
-    if(tx.isFireCallbacks()) {
-      for (int i=0; i < listeners.length; i++) {
+    if (tx.isFireCallbacks() && listeners.length > 0) {
+      final TXEvent e = tx.getEvent();
+      try {
+      for (int i = 0; i < listeners.length; i++) {
         try {
-          listeners[i].afterRollback(tx.getEvent());
+          listeners[i].afterRollback(e);
         } 
         catch (VirtualMachineError err) {
           SystemFailure.initiateFailure(err);
@@ -460,6 +510,9 @@ public final class TXManagerImpl implements CacheTransactionManager,
           logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
         }
       }
+      } finally {
+        e.release();
+      }
     }
   }
 
@@ -467,7 +520,10 @@ public final class TXManagerImpl implements CacheTransactionManager,
    * Called from Commit and Rollback to unblock waiting threads
    */
   private void cleanup(TransactionId txId) {
-    this.localTxMap.remove(txId);
+    TXStateProxy proxy = this.localTxMap.remove(txId);
+    if (proxy != null) {
+      proxy.close();
+    }
     Queue<Thread> waitingThreads = this.waitMap.get(txId);
     if (waitingThreads != null && !waitingThreads.isEmpty()) {
       for (Thread waitingThread : waitingThreads) {
@@ -535,6 +591,12 @@ public final class TXManagerImpl implements CacheTransactionManager,
       return;
     }
     this.closed = true;
+    for (TXStateProxy proxy: this.hostedTXStates.values()) {
+      proxy.close();
+    }
+    for (TXStateProxy proxy: this.localTxMap.values()) {
+      proxy.close();
+    }
     {
       TransactionListener[] listeners = getListeners();
       for (int i=0; i < listeners.length; i++) {
@@ -665,8 +727,13 @@ public final class TXManagerImpl implements CacheTransactionManager,
       synchronized(this.hostedTXStates) {
         val = this.hostedTXStates.get(key);
         if (val == null && msg.canStartRemoteTransaction()) {
-          val = new TXStateProxyImpl(this, key, msg.getTXOriginatorClient());
-          val.setLocalTXState(new TXState(val,true));
+          if (msg.isTransactionDistributed()) {
+            val = new DistTXStateProxyImplOnDatanode(this, key, msg.getTXOriginatorClient());
+            val.setLocalTXState(new DistTXState(val,true));
+          } else {
+            val = new TXStateProxyImpl(this, key, msg.getTXOriginatorClient());
+            val.setLocalTXState(new TXState(val,true));
+          }
           this.hostedTXStates.put(key, val);
         }
       }
@@ -676,6 +743,7 @@ public final class TXManagerImpl implements CacheTransactionManager,
         val.getLock().lock();
       }
     }
+
     setTXState(val);
     return val;
   }
@@ -701,8 +769,14 @@ public final class TXManagerImpl implements CacheTransactionManager,
       synchronized(this.hostedTXStates) {
         val = this.hostedTXStates.get(key);
         if (val == null && msg.canStartRemoteTransaction()) {
-          val = new TXStateProxyImpl(this, key,memberId);
-//          val.setLocalTXState(new TXState(val,true));
+          // [sjigyasu] TODO: Conditionally create object based on distributed or non-distributed tx mode 
+          if (msg instanceof TransactionMessage && ((TransactionMessage)msg).isTransactionDistributed()) {
+            val = new DistTXStateProxyImplOnDatanode(this, key, memberId);
+            //val.setLocalTXState(new DistTXState(val,true));
+          } else {
+            val = new TXStateProxyImpl(this, key, memberId);
+            //val.setLocalTXState(new TXState(val,true));
+          }
           this.hostedTXStates.put(key, val);
         }
       }
@@ -757,7 +831,11 @@ public final class TXManagerImpl implements CacheTransactionManager,
    */
   public TXStateProxy removeHostedTXState(TXId txId) {
     synchronized (this.hostedTXStates) {
-      return this.hostedTXStates.remove(txId);
+      TXStateProxy result = this.hostedTXStates.remove(txId);
+      if (result != null) {
+        result.close();
+      }
+      return result;
     }
   }
   
@@ -771,6 +849,7 @@ public final class TXManagerImpl implements CacheTransactionManager,
       while (iterator.hasNext()) {
         Entry<TXId, TXStateProxy> entry = iterator.next();
         if (entry.getValue().isOnBehalfOfClient()) {
+          entry.getValue().close();
           if (logger.isDebugEnabled()) {
             logger.debug("Cleaning up TXStateProxy for {}", entry.getKey());
           }
@@ -809,13 +888,18 @@ public final class TXManagerImpl implements CacheTransactionManager,
       return this.hostedTXStates.size();
     }
   }
+  public int localTransactionsInProgressForTest() {
+    return this.localTxMap.size();
+  }
 
   public void memberDeparted(InternalDistributedMember id, boolean crashed) {
     synchronized (this.hostedTXStates) {
-      Iterator<TXId> iterator = this.hostedTXStates.keySet().iterator();
+      Iterator<Map.Entry<TXId,TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
       while (iterator.hasNext()) {
-        TXId txId = iterator.next();
+        Map.Entry<TXId,TXStateProxy> me = iterator.next();
+        TXId txId = me.getKey();
         if (txId.getMemberId().equals(id)) {
+          me.getValue().close();
           if (logger.isDebugEnabled()) {
             logger.debug("Received memberDeparted, cleaning up txState:{}", txId);
           }
@@ -863,6 +947,7 @@ public final class TXManagerImpl implements CacheTransactionManager,
       while (iterator.hasNext()) {
         Map.Entry<TXId,TXStateProxy> entry = iterator.next();
         if (txIds.contains(entry.getKey())) {
+          entry.getValue().close();
           iterator.remove();
         }
       }
@@ -1366,4 +1451,47 @@ public final class TXManagerImpl implements CacheTransactionManager,
       return true;
     }
   }
+
+  // Used by tests
+  public Set<TXId> getLocalTxIds() {
+    return this.localTxMap.keySet();
+  }
+
+  // Used by tests
+  public ArrayList<TXId> getHostedTxIds() {
+    synchronized (this.hostedTXStates) {
+      return new ArrayList<TXId>(this.hostedTXStates.keySet());
+    }
+  }
+  
+  public void setDistributed(boolean flag) {
+    checkClosed();
+    TXStateProxy tx = getTXState();
+    // Check whether given flag and current flag are different and whether a transaction is in progress
+    if (tx != null && flag != isDistributed()) {
+      // Cannot change mode in the middle of a transaction
+      throw new java.lang.IllegalStateException(
+          LocalizedStrings.TXManagerImpl_CANNOT_CHANGE_TRANSACTION_MODE_WHILE_TRANSACTIONS_ARE_IN_PROGRESS
+              .toLocalizedString());
+    } else {
+      isTXDistributed.set(new Boolean(flag));
+    }
+  }
+
+  /*
+   * If explicitly set using setDistributed, this returns that value.
+   * If not, it returns the value of gemfire property "distributed-transactions" if set.
+   * If this is also not set, it returns the default value of this property.
+   */
+  public boolean isDistributed() {
+    
+     Boolean value = isTXDistributed.get();
+    // This can be null if not set in setDistributed().
+    if (value == null) {
+      return InternalDistributedSystem.getAnyInstance().getOriginalConfig().getDistributedTransactions();
+    } else {
+      return value.booleanValue();
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXMessage.java
index 3b482bf..c38f2ff 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXMessage.java
@@ -197,4 +197,10 @@ public abstract class TXMessage extends SerialDistributionMessage
   public boolean canParticipateInTransaction() {
     return true;
   }
+
+  @Override
+  public boolean isTransactionDistributed() {
+    return false;
+  }
+
 }


Mime
View raw message