geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [09/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
new file mode 100644
index 0000000..7939b77
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java
@@ -0,0 +1,400 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.TreeSet;
+
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * TxState on TX coordinator, created when coordinator is also a data node
+ * 
+ * @author shirishd
+ * 
+ */
+public final class DistTXStateOnCoordinator extends DistTXState implements
+    DistTXCoordinatorInterface {
+  
+  private ArrayList<DistTxEntryEvent> primaryTransactionalOperations = null;
+  private ArrayList<DistTxEntryEvent> secondaryTransactionalOperations = null;
+  
+  private boolean preCommitResponse = false;
+  private boolean rollbackResponse = false;
+
+  public DistTXStateOnCoordinator(TXStateProxy proxy,
+      boolean onBehalfOfRemoteStub) {
+    super(proxy, onBehalfOfRemoteStub);
+    primaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
+    secondaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
+  }
+  
+  public final ArrayList<DistTxEntryEvent> getPrimaryTransactionalOperations()
+      throws UnsupportedOperationInTransactionException {
+    return primaryTransactionalOperations;
+  }
+  
+  private final void addPrimaryTransactionalOperations(DistTxEntryEvent dtop) {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove these
+      logger
+          .debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations add "
+              + dtop
+              + " ,stub before="
+              + this
+              + " ,isUpdatingTxStateDuringPreCommit="
+              + isUpdatingTxStateDuringPreCommit());
+    }
+    if (!isUpdatingTxStateDuringPreCommit()) {
+      primaryTransactionalOperations.add(dtop);
+   // [DISTTX] TODO Remove this
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations "
+            + " add primary op = {}", dtop);
+        
+      }
+    }
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove these
+      logger
+          .debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations stub after add = "
+              + this);
+    }
+  }
+  
+  public final void addSecondaryTransactionalOperations(DistTxEntryEvent dtop)
+      throws UnsupportedOperationInTransactionException {
+    secondaryTransactionalOperations.add(dtop);
+  }
+  
+  @Override
+  public void precommit() {
+    boolean retVal = applyOpsOnRedundantCopy(this.proxy.getCache()
+        .getDistributedSystem().getDistributedMember(),
+        this.secondaryTransactionalOperations);
+    if (retVal) {
+      super.precommit();
+    }
+    this.preCommitResponse = retVal; // Apply if no exception
+  }
+  
+  @Override
+  public void rollback() {
+    super.rollback();
+    this.rollbackResponse = true; // True if no exception 
+    // Cleanup is called next
+  }
+    
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateStub#putEntry(com.gemstone.gemfire
+   * .internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object,
+   * boolean, long, boolean)
+   */
+  @Override
+  public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
+      Object expectedOldValue, boolean requireOldValue, long lastModified,
+      boolean overwriteDestroyed) {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove throwable
+      logger.debug("DistTXStateOnCoordinator.putEntry "
+          + event.getKeyInfo().getKey(), new Throwable());
+    }
+    
+    boolean returnValue = super.putEntry(event, ifNew, ifOld, expectedOldValue,
+        requireOldValue, lastModified, overwriteDestroyed);
+    
+    // putAll event is already added in postPutAll, don't add individual events
+    // from the putAll operation again
+    if (!event.getOperation().isPutAll()) {
+      addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    }
+    return returnValue;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com
+   * .gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean,
+   * java.lang.Object, boolean, long, boolean)
+   */
+  @Override
+  public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
+      boolean ifOld, Object expectedOldValue, boolean requireOldValue,
+      long lastModified, boolean overwriteDestroyed)
+      throws DataLocationException {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove throwable
+      logger.debug("DistTXStateOnCoordinator.putEntryOnRemote "
+          + event.getKeyInfo().getKey(), new Throwable());
+    }
+    
+    boolean returnValue = super.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue,
+        requireOldValue, lastModified, overwriteDestroyed);
+    
+    // putAll event is already added in postPutAll, don't add individual events
+    // from the putAll operation again
+    if (!event.getOperation().isPutAll()) {
+      addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    }
+    return returnValue;
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean,
+   * java.lang.Object)
+   */
+  public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
+      Object expectedOldValue) throws EntryNotFoundException {
+//    logger.debug("DistTXStateOnCoordinator.destroyExistingEntry", new Throwable());
+
+    super.destroyExistingEntry(event, cacheWrite, expectedOldValue);
+    
+    // removeAll event is already added in postRemoveAll, don't add individual
+    // events from the removeAll operation again
+    if (!event.getOperation().isRemoveAll()) {
+      addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    }
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java
+   * .lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl,
+   * java.lang.Object)
+   */
+  public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
+      Object expectedOldValue) throws DataLocationException {
+//    logger.debug("DistTXStateOnCoordinator.destroyOnRemote", new Throwable());
+    
+    super.destroyOnRemote(event, cacheWrite, expectedOldValue);
+    
+    // removeAll event is already added in postRemoveAll, don't add individual
+    // events from the removeAll operation again
+    if (!event.getOperation().isRemoveAll()) {
+    addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    }
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
+   */
+  public void invalidateExistingEntry(EntryEventImpl event,
+      boolean invokeCallbacks, boolean forceNewEntry) {
+//    logger
+//        .debug("DistTXStateOnCoordinator.invalidateExistingEntry", new Throwable());
+    
+    super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
+    addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
+   */
+  public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
+      boolean forceNewEntry) throws DataLocationException {
+//    logger.debug("DistTXStateOnCoordinator.invalidateOnRemote", new Throwable());
+    super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
+    addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+  }
+  
+  
+  public void postPutAll(DistributedPutAllOperation putallOp,
+      VersionedObjectList successfulPuts, LocalRegion region) {
+    super.postPutAll(putallOp, successfulPuts, region);
+    EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
+        Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
+            .getBaseEvent().getValue());
+    event.setEventId(putallOp.getBaseEvent().getEventId());
+    DistTxEntryEvent dtop = new DistTxEntryEvent(event);
+    dtop.setPutAllOperation(putallOp);
+    addPrimaryTransactionalOperations(dtop);
+  }
+  
+  public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
+      VersionedObjectList successfulOps, LocalRegion region) {
+    super.postRemoveAll(removeAllOp, successfulOps, region);
+    EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
+        region, removeAllOp.getBaseEvent().getKey());
+    event.setEventId(removeAllOp.getBaseEvent().getEventId());
+    DistTxEntryEvent dtop = new DistTxEntryEvent(event);
+    dtop.setRemoveAllOperation(removeAllOp);
+    addPrimaryTransactionalOperations(dtop);
+  }
+  
+  @Override
+  public boolean getPreCommitResponse()
+      throws UnsupportedOperationInTransactionException {
+    return this.preCommitResponse;
+  }
+
+  @Override
+  public boolean getRollbackResponse()
+      throws UnsupportedOperationInTransactionException {
+    return this.rollbackResponse;
+  }
+  
+  @Override
+  public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("setPrecommitMessage"));
+  }
+  
+  @Override
+  public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("setCommitMessage"));
+  }
+
+  @Override
+  public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_ROLLBACK_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("setRollbackMessage"));
+  }
+  
+  @Override
+  public void gatherAffectedRegions(HashSet<LocalRegion> regionSet,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException {
+    if (includePrimaryRegions) {
+      for (DistTxEntryEvent dtos : this.primaryTransactionalOperations) {
+        regionSet.add(dtos.getRegion());
+      }
+    }
+    if (includeRedundantRegions) {
+      for (DistTxEntryEvent dtos : this.secondaryTransactionalOperations) {
+        regionSet.add(dtos.getRegion());
+      }
+    }
+  }
+  
+  @Override
+  public void gatherAffectedRegionsName(TreeSet<String> sortedRegionName,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException {
+    if (includePrimaryRegions) {
+      gatherAffectedRegions(sortedRegionName,
+          this.primaryTransactionalOperations);
+    }
+    if (includeRedundantRegions) {
+      gatherAffectedRegions(sortedRegionName,
+          this.secondaryTransactionalOperations);
+    }
+  }
+  
+  public static void gatherAffectedRegions(TreeSet<String> sortedRegionName,
+      ArrayList<DistTxEntryEvent> regionOps) {
+    for (DistTxEntryEvent dtos : regionOps) {
+      LocalRegion lr = dtos.getRegion();
+      if (lr instanceof PartitionedRegion) {
+        sortedRegionName.add(PartitionedRegionHelper.getBucketFullPath(
+            lr.getFullPath(), dtos.getKeyInfo().getBucketId()));
+      } else {
+        sortedRegionName.add(lr.getFullPath());
+      }
+    }
+  }
+  
+  /**
+   * {@inheritDoc}
+   * 
+   */
+  @Override
+  protected boolean applyIndividualOp(DistTxEntryEvent dtop)
+      throws DataLocationException {
+    boolean result = true;
+    if (dtop.op.isUpdate() || dtop.op.isCreate()) { 
+      if (dtop.op.isPutAll()) {
+        assert(dtop.getPutAllOperation() != null);
+        //[DISTTX] TODO what do with versions next?
+        final VersionedObjectList versions = new VersionedObjectList(
+            dtop.getPutAllOperation().putAllDataSize, true,
+            dtop.region.concurrencyChecksEnabled);
+        postPutAll(dtop.getPutAllOperation(), versions, dtop.region);
+      } else {
+        result = putEntry(dtop, false/* ifNew */,
+          dtop.hasDelta()/* ifOld */, null/* expectedOldValue */,
+          false/* requireOldValue */, 0L/* lastModified */, true/*
+                                                                 * overwriteDestroyed
+                                                                 * *not*
+                                                                 * used
+                                                                 */);
+      }
+    } else if (dtop.op.isDestroy()) {
+      if (dtop.op.isRemoveAll()) {
+        assert (dtop.getRemoveAllOperation() != null);
+        // [DISTTX] TODO what do with versions next?
+        final VersionedObjectList versions = new VersionedObjectList(
+            dtop.getRemoveAllOperation().removeAllDataSize, true,
+            dtop.region.concurrencyChecksEnabled);
+        postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.region);
+      } else {
+        destroyExistingEntry(dtop, false/* TODO [DISTTX] */, null/*
+                                                             * TODO
+                                                             * [DISTTX]
+                                                             */);
+      }
+    } else if (dtop.op.isInvalidate()) {
+      invalidateExistingEntry(dtop, true/* TODO [DISTTX] */, false/*
+                                                              * TODO
+                                                              * [DISTTX]
+                                                              */);
+    } else {
+      logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}",
+          dtop);
+      assert (false);
+    }
+    return result;
+  }
+
+  
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(super.toString());
+    builder.append(" ,primary txOps=").append(this.primaryTransactionalOperations);
+    builder.append(" ,secondary txOps=").append(this.secondaryTransactionalOperations);
+    builder.append(" ,preCommitResponse=").append(this.preCommitResponse);
+    builder.append(" ,rollbackResponse=").append(this.rollbackResponse);
+    return builder.toString();
+  }
+
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return true;
+  }
+  
+  @Override
+  public void finalCleanup() {
+    cleanup();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java
new file mode 100644
index 0000000..8d9ad09
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java
@@ -0,0 +1,32 @@
+package com.gemstone.gemfire.internal.cache;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * 
+ * @author shirishd
+ *
+ */
+public abstract class DistTXStateProxyImpl extends TXStateProxyImpl {
+
+  protected static final Logger logger = LogService.getLogger();
+
+  public DistTXStateProxyImpl(TXManagerImpl managerImpl, TXId id,
+      InternalDistributedMember clientMember) {
+    super(managerImpl, id, clientMember);
+    // TODO Auto-generated constructor stub
+  }
+
+  public DistTXStateProxyImpl(TXManagerImpl managerImpl, TXId id, boolean isjta) {
+    super(managerImpl, id, isjta);
+    // TODO Auto-generated constructor stub
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
new file mode 100644
index 0000000..15291b0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -0,0 +1,1034 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.TransactionInDoubtException;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse;
+import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
+import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.DistClientTXStateStub;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
+
+  /**
+   * A map of distributed system member to either {@link DistPeerTXStateStub} or
+   * {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data
+   * node)
+   */
+  protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals = new HashMap<>();
+  private HashMap<LocalRegion, DistributedMember> rrTargets;
+  private  Set<DistributedMember> txRemoteParticpants = null; // other than local
+  protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
+
+  public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
+      InternalDistributedMember clientMember) {
+    super(managerImpl, id, clientMember);
+  }
+
+  public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
+      boolean isjta) {
+    super(managerImpl, id, isjta);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
+   * 
+   * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure
+   * these messages reach all
+   */
+  @Override
+  public void commit() throws CommitConflictException {
+    boolean preserveTx = false;
+    boolean precommitResult = false;
+    try {
+      // create a map of secondary(for PR) / replica(for RR) to stubs to send
+      // commit message to those
+      HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals = getSecondariesAndReplicasForTxOps();
+      // add it to the existing map and then send commit to all copies
+      target2realDeals.putAll(otherTargets2realDeals);
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("DistTXStateProxyImplOnCoordinator.commit target2realDeals = "
+                + target2realDeals);
+      }
+
+      precommitResult = doPrecommit();
+      if (precommitResult) {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug("DistTXStateProxyImplOnCoordinator.commit Going for commit ");
+        }
+        boolean phase2commitDone = doCommit();
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
+              + (phase2commitDone ? "Done" : "Failed"));
+        }
+        // [DISTTX] TODO Handle this exception well
+        if (!phase2commitDone) {
+          throw new TransactionInDoubtException(
+              LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER
+                  .toLocalizedString());
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug("DistTXStateProxyImplOnCoordinator.commit precommitResult = "
+                  + precommitResult);
+        }
+      }
+    } catch (CommitConflictException e) {
+      throw e;
+    } catch (UnsupportedOperationInTransactionException e) {
+      // fix for #42490
+      preserveTx = true;
+      throw e;
+    } finally {
+      // [DISTTX] TODO What about rollback exceptions?
+      if (!precommitResult) {
+        rollback();
+      }
+
+      inProgress = preserveTx;
+      if (this.synchRunnable != null) {
+        this.synchRunnable.abort();
+      }
+    }
+  }
+
+  /**
+   * creates a map of all secondaries(for PR) / replicas(for RR) to stubs to
+   * send commit message to those
+   */
+  private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
+    final GemFireCacheImpl cache = GemFireCacheImpl
+        .getExisting("getSecondariesAndReplicasForTxOps");
+    InternalDistributedMember currentNode = cache.getDistributedSystem()
+        .getDistributedMember();
+    
+    HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals = new HashMap<>();
+    for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals
+        .entrySet()) {
+      DistributedMember originalTarget = e.getKey();
+      DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
+
+      ArrayList<DistTxEntryEvent> primaryTxOps = distPeerTxStateStub
+          .getPrimaryTransactionalOperations();
+      for (DistTxEntryEvent dtop : primaryTxOps) {
+        LocalRegion lr = dtop.getRegion();
+        // replicas or secondaries
+        Set<InternalDistributedMember> otherNodes = null;
+        if (lr instanceof PartitionedRegion) {
+          Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop
+              .getRegion()).getRegionAdvisor().getBucketOwners(
+                  dtop.getKeyInfo().getBucketId());
+          allNodes.remove(originalTarget);
+          otherNodes = allNodes;
+        } else if (lr instanceof DistributedRegion) {
+          otherNodes = ((DistributedRegion) lr).getCacheDistributionAdvisor()
+              .adviseInitializedReplicates();
+          otherNodes.remove(originalTarget);
+        }
+
+        if (otherNodes != null) {
+          for (InternalDistributedMember dm : otherNodes) {
+            // whether the target already exists due to other Tx op on the node
+            DistTXCoordinatorInterface existingDistPeerTXStateStub = target2realDeals
+                .get(dm);
+            if (existingDistPeerTXStateStub == null) {
+              existingDistPeerTXStateStub = secondaryTarget2realDeals.get(dm);
+              if (existingDistPeerTXStateStub == null) {
+                DistTXCoordinatorInterface newTxStub = null;
+                if (currentNode.equals(dm)) {
+                  // [DISTTX] TODO add a test case for this condition?
+                  newTxStub = new DistTXStateOnCoordinator(this, false);
+                } else {
+                  newTxStub = new DistPeerTXStateStub(this, dm,
+                      onBehalfOfClientMember);
+                }
+                newTxStub.addSecondaryTransactionalOperations(dtop);
+                secondaryTarget2realDeals.put(dm, newTxStub);
+              } else {
+                existingDistPeerTXStateStub
+                    .addSecondaryTransactionalOperations(dtop);
+              }
+            } else {
+              existingDistPeerTXStateStub
+                  .addSecondaryTransactionalOperations(dtop);
+            }
+          }
+        }
+      }
+    }
+    return secondaryTarget2realDeals;
+  }
+
+  @Override
+  public void rollback() {
+    if (logger.isDebugEnabled()) {
+      logger
+          .debug("DistTXStateProxyImplOnCoordinator.rollback Going for rollback ");
+    }
+    
+    boolean finalResult = false;
+    final GemFireCacheImpl cache = GemFireCacheImpl
+        .getExisting("Applying Dist TX Rollback");
+    final DM dm = cache.getDistributionManager();
+    try {
+      // Create Tx Participants
+      Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
+      
+      // create processor and rollback message
+      DistTXRollbackMessage.DistTxRollbackReplyProcessor processor = new DistTXRollbackMessage.DistTxRollbackReplyProcessor(
+          this.getTxId(), dm, txRemoteParticpants, target2realDeals);
+      // TODO [DISTTX} whats ack threshold?
+      processor.enableSevereAlertProcessing();
+      final DistTXRollbackMessage rollbackMsg = new DistTXRollbackMessage(
+          this.getTxId(), this.onBehalfOfClientMember, processor);
+
+      // send rollback message to remote nodes
+      for (DistributedMember remoteNode : txRemoteParticpants) {
+        DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
+            .get(remoteNode);
+        if (remoteTXStateStub.isTxState()) {
+          throw new UnsupportedOperationInTransactionException(
+              LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                  "DistPeerTXStateStub", remoteTXStateStub.getClass()
+                      .getSimpleName()));
+        }
+        try {
+          remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
+          remoteTXStateStub.rollback();
+        } finally {
+          remoteTXStateStub.setRollbackMessage(null, null);
+          remoteTXStateStub.finalCleanup();
+        }
+        if (logger.isDebugEnabled()) { // TODO - make this trace level
+          logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = "
+              + remoteNode);
+        }
+      }
+
+      // Do rollback on local node
+      DistTXCoordinatorInterface localTXState = target2realDeals
+          .get(dm.getId());
+      if (localTXState != null) {
+        if (!localTXState.isTxState()) {
+          throw new UnsupportedOperationInTransactionException(
+              LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                  "DistTXStateOnCoordinator", localTXState.getClass()
+                      .getSimpleName()));
+        }
+        localTXState.rollback();
+        boolean localResult = localTXState.getRollbackResponse();
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = "
+              + dm.getId() + " ,result= " + localResult + " ,finalResult-old= "
+              + finalResult);
+        }
+        finalResult = finalResult && localResult;
+      }
+
+      /*
+       * [DISTTX] TODO Any test hooks
+       */
+      // if (internalAfterIndividualSend != null) {
+      // internalAfterIndividualSend.run();
+      // }
+
+      /*
+       * [DISTTX] TODO see how to handle exception
+       */
+
+      /*
+       * [DISTTX] TODO Any test hooks
+       */
+      // if (internalAfterIndividualCommitProcess != null) {
+      // // Testing callback
+      // internalAfterIndividualCommitProcess.run();
+      // }
+
+      { // Wait for results
+        dm.getCancelCriterion().checkCancelInProgress(null);
+        processor.waitForPrecommitCompletion();
+
+        // [DISTTX} TODO Handle stats
+        // dm.getStats().incCommitWaits();
+
+        Map<DistributedMember, Boolean> remoteResults = processor
+            .getRollbackResponseMap();
+        for (Entry<DistributedMember, Boolean> e : remoteResults.entrySet()) {
+          DistributedMember target = e.getKey();
+          Boolean remoteResult = e.getValue();
+          if (logger.isDebugEnabled()) { // TODO - make this trace level
+            logger
+                .debug("DistTXStateProxyImplOnCoordinator.rollback target = "
+                    + target + " ,result= " + remoteResult
+                    + " ,finalResult-old= " + finalResult);
+          }
+          finalResult = finalResult && remoteResult;
+        }
+      }
+
+    } finally {
+      inProgress = false;
+      if (this.synchRunnable != null) {
+        this.synchRunnable.abort();
+      }
+    }
+
+    /*
+     * [DISTTX] TODO Write similar method to take out exception
+     * 
+     * [DISTTX] TODO Handle Reliable regions
+     */
+    // if (this.hasReliableRegions) {
+    // checkDistributionReliability(distMap, processor);
+    // }
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistTXStateProxyImplOnCoordinator.rollback finalResult= "
+          + finalResult);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) {
+    if (r != null) {
+      target = null;
+      // wait for the region to be initialized fixes bug 44652
+      r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage);
+      if (r instanceof PartitionedRegion) {
+        target = getOwnerForKey(r, key);
+      } else if (r instanceof BucketRegion) { 
+        target = ((BucketRegion) r).getBucketAdvisor().getPrimary();
+//        target = r.getMyId();
+      } else { //replicated region
+        target = getRRTarget(key, r);
+      }
+      this.realDeal = target2realDeals.get(target);
+    }
+    if (this.realDeal == null) {
+//      assert (r != null);
+       if (r == null) { // TODO: stop gap to get tests working
+         this.realDeal = new DistTXStateOnCoordinator(this, false);
+         target = this.txMgr.getDM().getId();
+       } else {
+        // Code to keep going forward
+        if (r.hasServerProxy()) {
+          // TODO [DISTTX] See what we need for client?
+          this.realDeal = new DistClientTXStateStub(this, target, r);
+          if (r.scope.isDistributed()) {
+            if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
+              logger
+                  .warn(LocalizedMessage
+                      .create(
+                          LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
+                          r.getFullPath()));
+            }
+          }
+        } else {
+          // (r != null) code block above
+          if (target == null || target.equals(this.txMgr.getDM().getId())) {
+            this.realDeal = new DistTXStateOnCoordinator(this, false);
+          } else {
+            this.realDeal = new DistPeerTXStateStub(this, target,
+                onBehalfOfClientMember);
+          }
+        }
+      }
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug(
+                "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
+                this.realDeal, this.txMgr.getDM().getId(), this, target,
+                new Throwable());
+      }
+      target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+                + target2realDeals);
+      }
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug(
+                "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
+                this.realDeal, this, target, target2realDeals);
+      }
+    }
+    return this.realDeal;
+  }
+
+  @Override
+  public TXStateInterface getRealDeal(DistributedMember t) {
+    assert t != null;
+    this.realDeal = target2realDeals.get(target);
+    if (this.realDeal == null) {
+      this.target = t;
+      this.realDeal = new DistPeerTXStateStub(this, target,
+          onBehalfOfClientMember);
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug(
+                "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
+                this.realDeal, this.txMgr.getDM().getId());
+      }
+      if (!this.realDeal.isDistTx()
+          || this.realDeal.isCreatedOnDistTxCoordinator()
+          || !this.realDeal.isTxState()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED
+                .toLocalizedString("DistPeerTXStateStub", this.realDeal
+                    .getClass().getSimpleName()));
+      }
+      target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+                + target2realDeals);
+      }
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug(
+                "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
+                this.realDeal, this, target, target2realDeals);
+      }
+    }
+    return this.realDeal;
+  }
+
+  /*
+   * [DISTTX] TODO Do some optimization
+   */
+  private DistributedMember getRRTarget(KeyInfo key, LocalRegion r) {
+    if (this.rrTargets == null) {
+      this.rrTargets = new HashMap();
+    }
+    DistributedMember m = this.rrTargets.get(r);
+    if (m == null) {
+      m = getOwnerForKey(r, key);
+      this.rrTargets.put(r, m);
+    }
+    return m;
+  }
+  
+  private Set<DistributedMember> getTxRemoteParticpants(final DM dm) {
+    if (this.txRemoteParticpants == null) {
+      Set<DistributedMember> txParticpants = target2realDeals.keySet();
+      this.txRemoteParticpants = new HashSet<DistributedMember>(txParticpants);
+      // Remove local member from remote participant list
+      this.txRemoteParticpants.remove(dm.getId());
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
+                + txParticpants
+                + " ,txRemoteParticpants="
+                + this.txRemoteParticpants + " ,originator=" + dm.getId());
+      }
+    }
+    return txRemoteParticpants;
+  }
+
+  private boolean doPrecommit() {
+    boolean finalResult = true;
+    final GemFireCacheImpl cache = GemFireCacheImpl
+        .getExisting("Applying Dist TX Precommit");
+    final DM dm = cache.getDistributionManager();
+
+    // Create Tx Participants
+    Set<DistributedMember> txParticpants = target2realDeals.keySet();
+    Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
+
+    // Determine if the set of VMs for any of the Regions for this TX have
+    // changed
+    DistributedRegion dr = null;
+    HashSet<LocalRegion> affectedRegions = new HashSet<LocalRegion>();
+    for (DistTXCoordinatorInterface distTXStateStub : target2realDeals.values()) {
+      affectedRegions.clear();
+      distTXStateStub.gatherAffectedRegions(affectedRegions, true, false);
+      for (LocalRegion lr : affectedRegions) {
+        if (lr.getScope().isLocal()) {
+          continue;
+        }
+        // [DISTTX] TODO what about PR?
+        if (lr instanceof DistributedRegion) {
+          dr = (DistributedRegion) lr;
+          CacheDistributionAdvisor adv = dr.getCacheDistributionAdvisor();
+          Set newRegionMemberView = adv.adviseTX();
+
+          if (!txParticpants.containsAll(newRegionMemberView)) {
+            logger
+                .warn(LocalizedMessage
+                    .create(
+                        LocalizedStrings.TXCommitMessage_NEW_MEMBERS_FOR_REGION_0_ORIG_LIST_1_NEW_LIST_2,
+                        new Object[] { dr, txParticpants, newRegionMemberView }));
+          }
+        }
+      }
+    }
+
+    // create processor and precommit message
+    DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor = new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(
+        this.getTxId(), dm, txRemoteParticpants, target2realDeals);
+    // TODO [DISTTX} whats ack threshold?
+    processor.enableSevereAlertProcessing();
+    final DistTXPrecommitMessage precommitMsg = new DistTXPrecommitMessage(
+        this.getTxId(), this.onBehalfOfClientMember, processor);
+
+    // send precommit message to remote nodes
+    for (DistributedMember remoteNode : txRemoteParticpants) {
+      DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
+          .get(remoteNode);
+      if (remoteTXStateStub.isTxState()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "DistPeerTXStateStub", remoteTXStateStub.getClass()
+                    .getSimpleName()));
+      }
+      try {
+        remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
+        remoteTXStateStub.precommit();
+      } finally {
+        remoteTXStateStub.setPrecommitMessage(null, null);
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = "
+            + remoteNode);
+      }
+    }
+
+    // Do precommit on local node
+    TreeSet<String> sortedRegionName = new TreeSet<>();
+    DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
+    if (localTXState != null) {
+      if (!localTXState.isTxState()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "DistTXStateOnCoordinator", localTXState.getClass()
+                    .getSimpleName()));
+      }
+      localTXState.precommit();
+      boolean localResult = localTXState.getPreCommitResponse();
+      TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap = new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
+      if (localResult) {
+        localResult = ((DistTXStateOnCoordinator) localTXState)
+            .populateDistTxEntryStateList(entryStateSortedMap);
+        if (localResult) {
+          entryEventList = new ArrayList<ArrayList<DistTxThinEntryState>>(
+              entryStateSortedMap.values());
+          populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
+        }
+      }
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = "
+            + dm.getId() + " ,entryEventList="
+            + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+            + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+            + localResult + " ,finalResult-old= " + finalResult);
+      }
+      finalResult = finalResult && localResult;
+    }
+
+    /*
+     * [DISTTX] TODO Any test hooks
+     */
+    // if (internalAfterIndividualSend != null) {
+    // internalAfterIndividualSend.run();
+    // }
+
+    /*
+     * [DISTTX] TODO see how to handle exception
+     */
+
+    /*
+     * [DISTTX] TODO Any test hooks
+     */
+    // if (internalAfterIndividualCommitProcess != null) {
+    // // Testing callback
+    // internalAfterIndividualCommitProcess.run();
+    // }
+
+    { // Wait for results
+      dm.getCancelCriterion().checkCancelInProgress(null);
+      processor.waitForPrecommitCompletion();
+
+      // [DISTTX} TODO Handle stats
+      // dm.getStats().incCommitWaits();
+
+      Map<DistributedMember, DistTxPrecommitResponse> remoteResults = processor
+          .getCommitResponseMap();
+      for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults
+          .entrySet()) {
+        DistributedMember target = e.getKey();
+        DistTxPrecommitResponse remoteResponse = e.getValue();
+        ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = remoteResponse.getDistTxEntryEventList();
+        populateEntryEventMap(target, entryEventList, sortedRegionName);
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
+                  + target
+                  + " ,sortedRegions"
+                  + sortedRegionName
+                  + " ,entryEventList="
+                  + printEntryEventList(entryEventList)
+                  + " ,txEntryEventMap="
+                  + printEntryEventMap(this.txEntryEventMap)
+                  + " ,result= "
+                  + remoteResponse.getCommitState()
+                  + " ,finalResult-old= "
+                  + finalResult);
+        }
+        finalResult = finalResult && remoteResponse.getCommitState();
+      }
+    }
+
+    /*
+     * [DISTTX] TODO Write similar method to take out exception
+     * 
+     * [DISTTX] TODO Handle Reliable regions
+     */
+    // if (this.hasReliableRegions) {
+    // checkDistributionReliability(distMap, processor);
+    // }
+
+    if (logger.isDebugEnabled()) {
+      logger
+          .debug("DistTXStateProxyImplOnCoordinator.doPrecommit finalResult= "
+              + finalResult);
+    }
+    return finalResult;
+  }
+
+  /*
+   * Handle response of precommit reply
+   * 
+   * Go over list of region versions for this target and fill map
+   */
+  private void populateEntryEventMap(DistributedMember target,
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList,
+      TreeSet<String> sortedRegionName) {
+    if (this.txEntryEventMap == null) {
+      this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
+    }
+
+    DistTXCoordinatorInterface distTxIface = target2realDeals.get(target);
+    if (distTxIface.getPrimaryTransactionalOperations() != null
+        && distTxIface.getPrimaryTransactionalOperations().size() > 0) {
+      sortedRegionName.clear();
+      distTxIface.gatherAffectedRegionsName(sortedRegionName, true, false);
+
+      if (sortedRegionName.size() != entryEventList.size()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("size of "
+                + sortedRegionName.size() + " {" + sortedRegionName + "}"
+                + " for target=" + target, entryEventList.size() + " {"
+                + entryEventList + "}"));
+      }
+
+      int index = 0;
+      // Get region as per sorted order of region path
+      for (String rName : sortedRegionName) {
+        txEntryEventMap.put(rName, entryEventList.get(index++));
+      }
+    }
+  }
+  
+  /*
+   * Populate list of regions for this target, while sending commit messages
+   */
+  private void populateEntryEventList(DistributedMember target,
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList,
+      TreeSet<String> sortedRegionMap) {
+    DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
+    sortedRegionMap.clear();
+    distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
+
+    // Get region as per sorted order of region path
+    entryEventList.clear();
+    for (String rName : sortedRegionMap) {
+      ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap
+          .get(rName);
+      if (entryStates == null) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "entryStates for " + rName + " at target " + target, "null"));
+      }
+      entryEventList.add(entryStates);
+    }
+  }
+
+  /*
+   * [DISTTX] TODO - Handle result TXMessage
+   */
+  private boolean doCommit() {
+    boolean finalResult = true;
+    final GemFireCacheImpl cache = GemFireCacheImpl
+        .getExisting("Applying Dist TX Commit");
+    final DM dm = cache.getDistributionManager();
+
+    // Create Tx Participants
+    Set<DistributedMember> txRemoteParticpants = getTxRemoteParticpants(dm);
+
+    // create processor and commit message
+    DistTXCommitMessage.DistTxCommitReplyProcessor processor = new DistTXCommitMessage.DistTxCommitReplyProcessor(
+        this.getTxId(), dm, txRemoteParticpants, target2realDeals);
+    // TODO [DISTTX} whats ack threshold?
+    processor.enableSevereAlertProcessing();
+    final DistTXCommitMessage commitMsg = new DistTXCommitMessage(
+        this.getTxId(), this.onBehalfOfClientMember, processor);
+
+    // send commit message to remote nodes
+    ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
+    TreeSet<String> sortedRegionName = new TreeSet<>();
+    for (DistributedMember remoteNode : txRemoteParticpants) {
+      DistTXCoordinatorInterface remoteTXStateStub = target2realDeals
+          .get(remoteNode);
+      if (remoteTXStateStub.isTxState()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "DistPeerTXStateStub", remoteTXStateStub.getClass()
+                    .getSimpleName()));
+      }
+      try {
+        populateEntryEventList(remoteNode, entryEventList,
+            sortedRegionName);
+        commitMsg.setEntryStateList(entryEventList);
+        remoteTXStateStub.setCommitMessage(commitMsg, dm);
+        remoteTXStateStub.commit();
+      } finally {
+        remoteTXStateStub.setCommitMessage(null, null);
+        remoteTXStateStub.finalCleanup();
+      }
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
+                + remoteNode
+                + " ,sortedRegions="
+                + sortedRegionName
+                + " ,entryEventList="
+                + printEntryEventList(entryEventList)
+                + " ,txEntryEventMap="
+                + printEntryEventMap(this.txEntryEventMap));
+      }
+    }
+
+    // Do commit on local node
+    DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId());
+    if (localTXState != null) {
+      if (!localTXState.isTxState()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "DistTXStateOnCoordinator", localTXState.getClass()
+                    .getSimpleName()));
+      }
+      populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
+      ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
+      localTXState.commit();
+      TXCommitMessage localResultMsg = localTXState.getCommitMessage();
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateProxyImplOnCoordinator.doCommit local = "
+            + dm.getId() + " ,sortedRegions=" + sortedRegionName
+            + " ,entryEventList=" + printEntryEventList(entryEventList)
+            + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap)
+            + " ,result= " + localResultMsg != null + " ,finalResult-old= "
+            + finalResult);
+      }
+      finalResult = finalResult && (localResultMsg != null);
+    }
+
+    /*
+     * [DISTTX] TODO Any test hooks
+     */
+    // if (internalAfterIndividualSend != null) {
+    // internalAfterIndividualSend.run();
+    // }
+
+    /*
+     * [DISTTX] TODO see how to handle exception
+     */
+
+    /*
+     * [DISTTX] TODO Any test hooks
+     */
+    // if (internalAfterIndividualCommitProcess != null) {
+    // // Testing callback
+    // internalAfterIndividualCommitProcess.run();
+    // }
+
+    { // Wait for results
+      dm.getCancelCriterion().checkCancelInProgress(null);
+      processor.waitForPrecommitCompletion();
+
+      // [DISTTX} TODO Handle stats
+      dm.getStats().incCommitWaits();
+
+      Map<DistributedMember, TXCommitMessage> remoteResults = processor
+          .getCommitResponseMap();
+      for (Entry<DistributedMember, TXCommitMessage> e : remoteResults
+          .entrySet()) {
+        DistributedMember target = e.getKey();
+        TXCommitMessage remoteResultMsg = e.getValue();
+        if (logger.isDebugEnabled()) { // TODO - make this trace level
+          logger
+              .debug("DistTXStateProxyImplOnCoordinator.doCommit got results from target = "
+                  + target + " ,result= " + remoteResultMsg != null
+                  + " ,finalResult-old= " + finalResult);
+        }
+        finalResult = finalResult && remoteResultMsg != null;
+      }
+    }
+
+    /*
+     * [DISTTX] TODO Write similar method to take out exception
+     * 
+     * [DISTTX] TODO Handle Reliable regions
+     */
+    // if (this.hasReliableRegions) {
+    // checkDistributionReliability(distMap, processor);
+    // }
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistTXStateProxyImplOnCoordinator.doCommit finalResult= "
+          + finalResult);
+    }
+    return finalResult;
+  }
+  
+  /**
+   * For distributed transactions, this divides the user's putAll operation into
+   * multiple per bucket putAll ops(with entries to be put in that bucket) and
+   * then fires those using using appropriate TXStateStub (for target that host
+   * the corresponding bucket)
+   */
+  @Override
+  public void postPutAll(DistributedPutAllOperation putallOp,
+      VersionedObjectList successfulPuts, LocalRegion region) {
+    if (putallOp.putAllData.length == 0) {
+      return;
+    }
+    if (region instanceof DistributedRegion) {
+      super.postPutAll(putallOp, successfulPuts, region);
+    } else {
+      region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
+                                                               // #43651
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll "
+            + "processing putAll op for region {}, size of putAllOp "
+            + "is {}", region, putallOp.putAllData.length);
+      }
+
+
+      //map of bucketId to putall op for this bucket
+      HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap = 
+          new HashMap<Integer, DistributedPutAllOperation>();
+      //map of bucketId to TXStateStub for target that hosts this bucket
+      HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap = 
+          new HashMap<Integer, DistTXCoordinatorInterface>();
+      
+      //separate the putall op per bucket
+      for (int i=0; i<putallOp.putAllData.length; i++) {
+        assert (putallOp.putAllData[i] != null);
+        Object key = putallOp.putAllData[i].key;
+        int bucketId = putallOp.putAllData[i].getBucketId();
+        
+        DistributedPutAllOperation putAllForBucket = 
+            bucketToPutallMap.get(bucketId);;
+        if (putAllForBucket == null) {
+          EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
+              Operation.PUTALL_CREATE, key,
+              putallOp.putAllData[i].getValue());
+          event.setEventId(putallOp.putAllData[i].getEventID());
+          putAllForBucket = new DistributedPutAllOperation(
+              event, putallOp.putAllDataSize, putallOp.isBridgeOp);
+          bucketToPutallMap.put(bucketId, putAllForBucket);
+        } 
+        putAllForBucket.addEntry(putallOp.putAllData[i]);
+
+        KeyInfo ki = new KeyInfo(key, null, null);
+        DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region);
+        bucketToTxStateStubMap.put(bucketId, tsi);
+      }
+      
+      // fire a putAll operation for each bucket using appropriate TXStateStub
+      // (for target that host this bucket)
+
+      // [DISTTX] [TODO] Perf: Can this be further optimized?
+      // This sends putAll in a loop to each target bucket (and waits for ack)
+      // one after another.Could we send respective putAll messages to all
+      // targets using same reply processor and wait on it?
+      for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap
+          .entrySet()) {
+        Integer bucketId = e.getKey();
+        DistTXCoordinatorInterface dtsi = e.getValue();
+        DistributedPutAllOperation putAllForBucket = bucketToPutallMap
+            .get(bucketId);
+        
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll processing"
+              + " putAll for ##bucketId = {}, ##txStateStub = {}, "
+              + "##putAllOp = {}"
+              , bucketId, dtsi, putAllForBucket);
+        }
+        dtsi.postPutAll(putAllForBucket, successfulPuts, region);
+      }
+    }
+  }
+
+  /**
+   * For distributed transactions, this divides the user's removeAll operation
+   * into multiple per bucket removeAll ops(with entries to be removed from that
+   * bucket) and then fires those using using appropriate TXStateStub (for
+   * target that host the corresponding bucket)
+   */
+  @Override
+  public void postRemoveAll(DistributedRemoveAllOperation op,
+      VersionedObjectList successfulOps, LocalRegion region) {
+    if (op.removeAllData.length == 0) {
+      return;
+    }
+    if (region instanceof DistributedRegion) {
+      super.postRemoveAll(op, successfulOps, region);
+    } else {
+      region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
+                                                               // #43651
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll "
+            + "processing removeAll op for region {}, size of removeAll "
+            + "is {}", region, op.removeAllDataSize);
+      }
+      
+      //map of bucketId to removeAll op for this bucket
+      HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap = 
+          new HashMap<Integer, DistributedRemoveAllOperation>();
+      //map of bucketId to TXStateStub for target that hosts this bucket
+      HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap = 
+          new HashMap<Integer, DistTXCoordinatorInterface>();
+      
+      //separate the removeAll op per bucket
+      for (int i=0; i<op.removeAllData.length; i++) {
+        assert (op.removeAllData[i] != null);
+        Object key = op.removeAllData[i].key;
+        int bucketId = op.removeAllData[i].getBucketId();
+        
+        DistributedRemoveAllOperation removeAllForBucket = 
+            bucketToRemoveAllMap.get(bucketId);
+        if (removeAllForBucket == null) {
+          EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
+              Operation.REMOVEALL_DESTROY, key, null);
+          removeAllForBucket = new DistributedRemoveAllOperation(
+              event, op.removeAllDataSize, op.isBridgeOp);
+          bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
+        } 
+        removeAllForBucket.addEntry(op.removeAllData[i]);
+
+        KeyInfo ki = new KeyInfo(key, null, null);
+        DistTXCoordinatorInterface tsi = (DistTXCoordinatorInterface) getRealDeal(ki, region);
+        bucketToTxStateStubMap.put(bucketId, tsi);
+      }
+      
+      // fire a removeAll operation for each bucket using appropriate TXStateStub
+      // (for target that host this bucket)
+
+      // [DISTTX] [TODO] Perf: Can this be further optimized?
+      // This sends putAll in a loop to each target bucket (and waits for ack)
+      // one after another.Could we send respective putAll messages to all
+      // targets using same reply processor and wait on it?
+      for (Entry<Integer, DistTXCoordinatorInterface> e : bucketToTxStateStubMap
+          .entrySet()) {
+        Integer bucketId = e.getKey();
+        DistTXCoordinatorInterface dtsi = e.getValue();
+        DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap
+            .get(bucketId);
+        
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+              + " removeAll for ##bucketId = {}, ##txStateStub = {}, "
+              + "##removeAllOp = {}"
+              , bucketId, dtsi, removeAllForBucket);
+        }
+        dtsi.postRemoveAll(removeAllForBucket, successfulOps, region);
+      }
+
+    }
+  }
+  
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return true;
+  }
+  
+  public static String printEntryEventMap(
+      HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
+    StringBuilder str = new StringBuilder();
+    str.append(" (");
+    str.append(txRegionVersionsMap.size());
+    str.append(")=[ ");
+    for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
+        .entrySet()) {
+      str.append(" {").append(entry.getKey());
+      str.append(":").append("size(").append(entry.getValue().size())
+          .append(")");
+      str.append("=").append(entry.getValue()).append("}, ");
+    }
+    str.append(" } ");
+    return str.toString();
+  }
+  
+  public static String printEntryEventList(
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+    StringBuilder str = new StringBuilder();
+    str.append(" (");
+    str.append(entryEventList.size());
+    str.append(")=[ ");
+    for (ArrayList<DistTxThinEntryState> entry : entryEventList) {
+      str.append(" ( ");
+      str.append(entry.size());
+      str.append(" )={").append(entry);
+      str.append(" } ");
+    }
+    str.append(" ] ");
+    return str.toString();
+  }
+  
+  /*
+   * Do not return null
+   */
+  public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) {
+    DistributedMember m = r.getOwnerForKey(key);
+    if (m == null) {
+      GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey");
+      m = cache.getDistributedSystem().getDistributedMember();
+    }
+    return m;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java
new file mode 100644
index 0000000..8750158
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java
@@ -0,0 +1,112 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.TreeMap;
+
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+public class DistTXStateProxyImplOnDatanode extends DistTXStateProxyImpl {
+
+  private DistTXPrecommitMessage preCommitMessage = null;
+  private boolean preCommitResponse = false;
+  
+  public DistTXStateProxyImplOnDatanode(TXManagerImpl managerImpl, TXId id,
+      InternalDistributedMember clientMember) {
+    super(managerImpl, id, clientMember);
+  }
+
+  public DistTXStateProxyImplOnDatanode(TXManagerImpl managerImpl, TXId id,
+      boolean isjta) {
+    super(managerImpl, id, isjta);
+  }
+
+  @Override
+  public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) {
+    if (this.realDeal == null) {
+      this.realDeal = new DistTXState(this, false);
+      if (r != null) {
+        // wait for the region to be initialized fixes bug 44652
+        r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage);
+        target = r.getOwnerForKey(key);
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("Built a new DistTXState: {} me:{}", this.realDeal,
+            this.txMgr.getDM().getId());
+      }
+    }
+    return this.realDeal;
+  }
+
+  @Override
+  public TXStateInterface getRealDeal(DistributedMember t) {
+    assert t != null;
+    if (this.realDeal == null) {
+      this.target = t;
+      this.realDeal = new DistTXState(this, false);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Built a new DistTXState: {} me:{}", this.realDeal,
+            this.txMgr.getDM().getId());
+      }
+    }
+    return this.realDeal;
+  }
+  
+  private DistTXState getRealDeal()
+      throws UnsupportedOperationInTransactionException {
+    if (this.realDeal == null || !this.realDeal.isDistTx()
+        || !this.realDeal.isTxState()
+        || this.realDeal.isCreatedOnDistTxCoordinator()) {
+      throw new UnsupportedOperationInTransactionException(
+          LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+              "DistTXStateOnDatanode", this.realDeal != null ? this.realDeal
+                  .getClass().getSimpleName() : "null"));
+    }
+    return (DistTXState) this.realDeal;
+  }
+  
+  @Override
+  public void precommit() throws CommitConflictException,
+      UnsupportedOperationInTransactionException {
+    try {
+      DistTXState txState = getRealDeal();
+      boolean retVal = txState.applyOpsOnRedundantCopy(
+          this.preCommitMessage.getSender(),
+          this.preCommitMessage.getSecondaryTransactionalOperations());
+      if (retVal) {
+        setCommitOnBehalfOfRemoteStub(true);
+        txState.precommit();
+      }
+      this.preCommitResponse = retVal; // assign at last, if no exception
+    } catch (UnsupportedOperationInTransactionException e) {
+      throw e;
+    } finally {
+      inProgress = true;
+    }
+  }
+
+  public void setPreCommitMessage(DistTXPrecommitMessage preCommitMessage) {
+    this.preCommitMessage = preCommitMessage;
+  }
+
+  public boolean getPreCommitResponse() {
+    return preCommitResponse;
+  }
+  
+  /*
+   * Populate list of versions for each region while replying precommit
+   */
+  public boolean populateDistTxEntryStateList(
+      TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap) {
+    return getRealDeal().populateDistTxEntryStateList(entryStateSortedMap);
+  }
+  
+  public void populateDistTxEntryStates(
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+    getRealDeal().setDistTxEntryStates(entryEventList);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index 9dc24af..d557be6 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -51,6 +51,8 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.CopyOnWriteHashSet;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllMessage;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
 import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
 import com.gemstone.gemfire.internal.cache.UpdateOperation.UpdateMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
@@ -63,6 +65,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.OffHeapReference;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 
 /**
@@ -102,6 +106,50 @@ public abstract class DistributedCacheOperation {
    * @since 5.7
    */
   public static final byte DESERIALIZATION_POLICY_LAZY = (byte)2;
+  
+  /**
+   * @param deserializationPolicy must be one of the following: DESERIALIZATION_POLICY_NONE, DESERIALIZATION_POLICY_EAGER, DESERIALIZATION_POLICY_LAZY.
+   */
+  public static void writeValue(final byte deserializationPolicy, final Object vObj, final byte[] vBytes, final DataOutput out) throws IOException {
+    if (vObj != null) {
+      if (deserializationPolicy == DESERIALIZATION_POLICY_EAGER) {
+        // for DESERIALIZATION_POLICY_EAGER avoid extra byte array serialization
+        DataSerializer.writeObject(vObj, out);
+      } else if (deserializationPolicy == DESERIALIZATION_POLICY_NONE) {
+        // We only have NONE with a vObj when vObj is off-heap and not serialized.
+        OffHeapReference ohref = (OffHeapReference) vObj;
+        assert !ohref.isSerialized();
+        DataSerializer.writeByteArray(ohref.getValueAsHeapByteArray(), out);
+      } else { // LAZY
+        // TODO OFFHEAP MERGE: cache the oldValue that is serialized here
+        // into the event
+        DataSerializer.writeObjectAsByteArray(vObj, out);
+      }
+    } else {
+      if (deserializationPolicy == DESERIALIZATION_POLICY_EAGER) {
+        // object is already in serialized form in the byte array.
+        // So just write the bytes to the stream.
+        // fromData will call readObject which will deserialize to object form.
+        out.write(vBytes);
+      } else {
+        DataSerializer.writeByteArray(vBytes, out);
+      }
+    }    
+  }
+  // static values for oldValueIsObject
+  public static final byte VALUE_IS_BYTES = 0;
+  public static final byte VALUE_IS_SERIALIZED_OBJECT = 1;
+  public static final byte VALUE_IS_OBJECT = 2;
+
+  /**
+   * Given a VALUE_IS_* constant convert and return the corresponding DESERIALIZATION_POLICY_*.
+   */
+  public static byte valueIsToDeserializationPolicy(boolean oldValueIsSerialized) {
+    if (!oldValueIsSerialized) return DESERIALIZATION_POLICY_NONE;
+    if (CachedDeserializableFactory.preferObject()) return DESERIALIZATION_POLICY_EAGER;
+    return DESERIALIZATION_POLICY_LAZY;
+  }
+
 
   public final static byte DESERIALIZATION_POLICY_NUMBITS =
           DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
@@ -792,7 +840,7 @@ public abstract class DistributedCacheOperation {
 
   public static abstract class CacheOperationMessage extends
       SerialDistributionMessage implements MessageWithReply,
-      DirectReplyMessage, ReliableDistributionData {
+      DirectReplyMessage, ReliableDistributionData, OldValueImporter {
 
     protected final static short POSSIBLE_DUPLICATE_MASK = POS_DUP;
     protected final static short OLD_VALUE_MASK =
@@ -810,6 +858,10 @@ public abstract class DistributedCacheOperation {
 
     private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
 
+	protected final static short FETCH_FROM_HDFS = 0x200;
+    
+    protected final static short IS_PUT_DML = 0x100;
+
     public boolean needsRouting;
 
     protected String regionPath;
@@ -906,20 +958,18 @@ public abstract class DistributedCacheOperation {
      * @param event the entry event that contains the old value
      */
     public void appendOldValueToMessage(EntryEventImpl event) {
-      Object val = event.getRawOldValue();
-      if (val == Token.NOT_AVAILABLE ||
-          val == Token.REMOVED_PHASE1 ||
-          val == Token.REMOVED_PHASE2 ||
-          val == Token.DESTROYED ||
-          val == Token.TOMBSTONE) {
-        return;
-      }
-      if (val instanceof CachedDeserializable) {
-        val = ((CachedDeserializable)val).getValue();
+      {
+        @Unretained Object val = event.getRawOldValue();
+        if (val == null ||
+            val == Token.NOT_AVAILABLE ||
+            val == Token.REMOVED_PHASE1 ||
+            val == Token.REMOVED_PHASE2 ||
+            val == Token.DESTROYED ||
+            val == Token.TOMBSTONE) {
+          return;
+        }
       }
-      this.oldValue = val;
-      this.hasOldValue = true;
-      this.oldValueIsSerialized = (val instanceof byte[]);
+      event.exportOldValue(this);
     }
     
     /**
@@ -977,6 +1027,12 @@ public abstract class DistributedCacheOperation {
     public boolean containsRegionContentChange() {
       return true;
     }
+    
+    protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
+      Assert.assertTrue(this.regionPath != null, "regionPath was null");
+      GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem());
+      return gfc.getRegionByPathForProcessing(this.regionPath);
+    }
 
     @Override
     protected final void process(final DistributionManager dm) {
@@ -998,12 +1054,7 @@ public abstract class DistributedCacheOperation {
           return;
         }
 
-        Assert.assertTrue(this.regionPath != null, "regionPath was null");
-
-        GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm
-            .getSystem());
-        final LocalRegion lclRgn = gfc
-            .getRegionByPathForProcessing(this.regionPath);
+        final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
         basicProcess(dm, lclRgn);
       } catch (CancelException e) {
@@ -1084,6 +1135,7 @@ public abstract class DistributedCacheOperation {
         }
 
         event = createEvent(rgn);
+        try {
         boolean isEntry = event.getOperation().isEntry();
 
         if (isEntry && this.possibleDuplicate) {
@@ -1105,6 +1157,11 @@ public abstract class DistributedCacheOperation {
         }
         
         sendReply = operateOnRegion(event, dm) && sendReply;
+        } finally {
+          if (event instanceof EntryEventImpl) {
+            ((EntryEventImpl) event).release();
+          }
+        }
       } catch (RegionDestroyedException e) {
         this.closed = true;
         if (logger.isDebugEnabled()) {
@@ -1287,12 +1344,15 @@ public abstract class DistributedCacheOperation {
       this.hasDelta = (bits & DELTA_MASK) != 0;
       this.hasOldValue = (bits & OLD_VALUE_MASK) != 0;
       if (this.hasOldValue) {
-        // below boolean is not strictly required, but this is for compatibility
-        // with SQLFire code which writes as byte here to indicate whether
-        // oldValue is an object, serialized object or byte[]
-        in.readByte();
+        byte b = in.readByte();
+        if (b == 0) {
+          this.oldValueIsSerialized = false;
+        } else if (b == 1) {
+          this.oldValueIsSerialized = true;
+        } else {
+          throw new IllegalStateException("expected 0 or 1");
+        }
         this.oldValue = DataSerializer.readByteArray(in);
-        this.oldValueIsSerialized = true;
       }
       boolean hasFilterInfo = (bits & FILTER_INFO_MASK) != 0;
       this.needsRouting = (bits & NEEDS_ROUTING_MASK) != 0;
@@ -1306,6 +1366,10 @@ public abstract class DistributedCacheOperation {
       }
       if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
         this.inhibitAllNotifications = true;
+	  if (this instanceof PutAllMessage) {
+        ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0);
+        ((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0);
+      }
       }
     }
 
@@ -1332,18 +1396,20 @@ public abstract class DistributedCacheOperation {
         DataSerializer.writeObject(this.callbackArg, out);
       }
       if (this.hasOldValue) {
-        // below boolean is not strictly required, but this is for compatibility
-        // with SQLFire code which writes as byte here to indicate whether
-        // oldValue is an object, serialized object or byte[]
         out.writeByte(this.oldValueIsSerialized ? 1 : 0);
         // the receiving side expects that the old value will have been serialized
         // as a byte array
-        if (this.oldValueIsSerialized) {
-          DataSerializer.writeByteArray((byte[])this.oldValue, out);
-        }
-        else {
-          DataSerializer.writeObjectAsByteArray(this.oldValue, out);
+        final byte policy = valueIsToDeserializationPolicy(this.oldValueIsSerialized);
+        final Object vObj;
+        final byte[] vBytes;
+        if (!this.oldValueIsSerialized && this.oldValue instanceof byte[]) {
+          vObj = null;
+          vBytes = (byte[])this.oldValue;
+        } else {
+          vObj = this.oldValue;
+          vBytes = null;
         }
+        writeValue(policy, vObj, vBytes, out);
       }
       if (this.filterRouting != null) {
         InternalDataSerializer.invokeToData(this.filterRouting, out);
@@ -1427,6 +1493,50 @@ public abstract class DistributedCacheOperation {
     public void setSendDelta(boolean sendDelta) {
       this.sendDelta = sendDelta;
     }
+
+    @Override
+    public boolean prefersOldSerialized() {
+      return true;
+    }
+
+    @Override
+    public boolean isUnretainedOldReferenceOk() {
+      return true;
+    }
+
+    @Override
+    public boolean isCachedDeserializableValueOk() {
+      return false;
+    }
+
+    @Override
+    public void importOldObject(Object ov, boolean isSerialized) {
+      this.oldValueIsSerialized = isSerialized;
+      this.oldValue = ov;
+      this.hasOldValue = true;
+    }
+
+    @Override
+    public void importOldBytes(byte[] ov, boolean isSerialized) {
+      this.oldValueIsSerialized = isSerialized;
+      this.oldValue = ov;
+      this.hasOldValue = true;
+    }
+
+    protected final boolean _mayAddToMultipleSerialGateways(DistributionManager dm) {
+      int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); 
+      try {
+        LocalRegion lr = getLocalRegionForProcessing(dm);
+        if (lr == null) {
+          return false;
+        }
+        return lr.notifiesMultipleSerialGateways();
+      } catch (CancelException ignore) {
+        return false;
+      } finally {
+        LocalRegion.setThreadInitLevelRequirement(oldLevel);
+      }
+    }
   }
 
   /** Custom subclass that keeps all ReplyExceptions */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
index 972d43b..8a9434d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
@@ -105,6 +105,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
 	  return putAllData;
   }
   
+  public void setPutAllEntryData(PutAllEntryData[] putAllEntryData) {
+    for (int i = 0; i < putAllEntryData.length; i++) {
+      putAllData[i] = putAllEntryData[i];
+    }
+    this.putAllDataSize = putAllEntryData.length;
+  }
+  
+  /**
+   * Add an entry that this putall operation should distribute.
+   */
+  public void addEntry(PutAllEntryData putAllEntry)
+  {
+    this.putAllData[this.putAllDataSize] = putAllEntry;
+    this.putAllDataSize += 1;
+    //cachedEvents.add(ev);
+  }
+  
   /**
    * Add an entry that this putall operation should distribute.
    */
@@ -179,6 +196,17 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     };
   }
   
+  public void freeOffHeapResources() {
+    // I do not use eventIterator here because it forces the lazy creation of EntryEventImpl by calling getEventForPosition.
+    for (int i=0; i < this.putAllDataSize; i++) {
+      PutAllEntryData entry = this.putAllData[i];
+      if (entry != null && entry.event != null) {
+        entry.event.release();
+      }
+    }
+  }
+  
+  
   public EntryEventImpl getEventForPosition(int position) {
     PutAllEntryData entry = this.putAllData[position];
     if (entry == null) {
@@ -188,7 +216,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
       return entry.event;
     }
     LocalRegion region = (LocalRegion)this.event.getRegion();
-    EntryEventImpl ev = new EntryEventImpl(
+    EntryEventImpl ev = EntryEventImpl.create(
         region,
         entry.getOp(),
         entry.getKey(), null/* value */, this.event.getCallbackArgument(),
@@ -196,6 +224,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
         this.event.getDistributedMember(),
         this.event.isGenerateCallbacks(),
         entry.getEventID());
+    boolean returnedEv = false;
+    try {
     ev.setPossibleDuplicate(entry.isPossibleDuplicate());
     if (entry.versionTag != null && region.concurrencyChecksEnabled) {
       VersionSource id = entry.versionTag.getMemberID();
@@ -207,6 +237,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     }
       
     entry.event = ev;
+    returnedEv = true;
     if (entry.getValue() == null && ev.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) {
       ev.setLocalInvalid(true);
     }
@@ -223,6 +254,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     ev.callbacksInvoked(entry.isCallbacksInvoked());
     ev.setTailKey(entry.getTailKey());
     return ev;
+    } finally {
+      if (!returnedEv) {
+        ev.release();
+      }
+    }
   }
 
   public final EntryEventImpl getBaseEvent() {
@@ -269,8 +305,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     public PutAllEntryData(EntryEventImpl event) {
 
       this.key = event.getKey();
-      this.value = event.getRawNewValue();
-      Object oldValue = event.getRawOldValue();
+      this.value = event.getRawNewValueAsHeapObject();
+      Object oldValue = event.getRawOldValueAsHeapObject();
 
       if (oldValue == Token.NOT_AVAILABLE || Token.isRemoved(oldValue)) {
         this.oldValue = null;
@@ -808,6 +844,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     PutAllMessage msg = new PutAllMessage();
     msg.eventId = event.getEventId();
     msg.context = event.getContext();
+	msg.setFetchFromHDFS(event.isFetchFromHDFS());
+    msg.setPutDML(event.isPutDML());
     return msg;
   }
 
@@ -821,7 +859,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
   public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
     final EntryEventImpl event = getBaseEvent();
     PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true,
-        event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
+        event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/);
     if (event.getContext() != null) {
       prMsg.setBridgeContext(event.getContext());
     }
@@ -850,7 +888,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
       PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId);
       if (prMsg == null) {
         prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false,
-            event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
+            event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML());
+        prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
 
         // set dpao's context(original sender) into each PutAllMsg
         // dpao's event's context could be null if it's P2P putAll in PR
@@ -1026,6 +1065,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
 
     protected EventID eventId = null;
     
+    // By default, fetchFromHDFS == true;
+    private transient boolean fetchFromHDFS = true;
+    
+    private transient boolean isPutDML = false;
+
     protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
     protected static final short SKIP_CALLBACKS =
       (short)(HAS_BRIDGE_CONTEXT << 1);
@@ -1045,7 +1089,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     throws EntryNotFoundException
     {
       // Gester: We have to specify eventId for the message of MAP
-      EntryEventImpl event = new EntryEventImpl(
+      EntryEventImpl event = EntryEventImpl.create(
           rgn,
           Operation.PUTALL_UPDATE /* op */, null /* key */, null/* value */,
           this.callbackArg, true /* originRemote */, getSender());
@@ -1080,11 +1124,13 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
      *          the region the entry is put in
      */
     public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn,
-        boolean requiresRegionContext) {
+        boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) {
       EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(), 
           this.context, rgn,
           requiresRegionContext, this.possibleDuplicate,
           this.needsRouting, this.callbackArg, true, skipCallbacks);
+	  ev.setFetchFromHDFS(fetchFromHDFS);
+      ev.setPutDML(isPutDML);
       // we don't need to set old value here, because the msg is from remote. local old value will get from next step
       try {
         super.basicOperateOnRegion(ev, rgn);
@@ -1094,6 +1140,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
             rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
           }
         }
+        ev.release();
       }
     }
     
@@ -1119,10 +1166,12 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
         ((KeyWithRegionContext)key).setRegionContext(rgn);
       }
       EventID evId = entry.getEventID();
-      EntryEventImpl ev = new EntryEventImpl(rgn, entry.getOp(),
+      EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(),
           key, null/* value */, callbackArg,
           originRemote, sender, !skipCallbacks,
           evId);
+      boolean returnedEv = false;
+      try {
       if (context != null) {
         ev.context = context;
       }
@@ -1147,7 +1196,13 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
        * Setting tailKey for the secondary bucket here. Tail key was update by the primary.
        */
       ev.setTailKey(entry.getTailKey());
+      returnedEv = true;
       return ev;
+      } finally {
+        if (!returnedEv) {
+          ev.release();
+        }
+      }
     }
 
     @Override
@@ -1168,7 +1223,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
               logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender);
             }
             putAllData[i].setSender(sender);
-            doEntryPut(putAllData[i], rgn, requiresRegionContext);
+            doEntryPut(putAllData[i], rgn, requiresRegionContext,  fetchFromHDFS, isPutDML);
           }
         }
       }, ev.getEventId());
@@ -1212,7 +1267,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
 
     @Override
     public void toData(DataOutput out) throws IOException {
-
       super.toData(out);
       DataSerializer.writeObject(this.eventId, out);
       InternalDataSerializer.writeUnsignedVL(this.putAllDataSize, out);
@@ -1297,5 +1351,25 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
       }
       return Arrays.asList(ops);
     }
+    
+    public void setFetchFromHDFS(boolean val) {
+      this.fetchFromHDFS = val;
+    }
+    
+    public void setPutDML(boolean val) {
+      this.isPutDML = val;
+    }
+    
+    @Override
+    protected short computeCompressedExtBits(short bits) {
+      bits = super.computeCompressedExtBits(bits);
+      if (fetchFromHDFS) {
+        bits |= FETCH_FROM_HDFS;
+      }
+      if (isPutDML) {
+        bits |= IS_PUT_DML;
+      }
+      return bits;
+    }
   }
 }


Mime
View raw message