geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [10/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:11 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCommitMessage.java
new file mode 100644
index 0000000..e57e33e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCommitMessage.java
@@ -0,0 +1,489 @@
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CommitIncompleteException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.ReplySender;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+
+/**
+ * @author vivekb
+ * 
+ */
+public final class DistTXCommitMessage extends TXMessage {
+
+  private static final Logger logger = LogService.getLogger();
+  private ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null;
+  
+  /** for deserialization */
+  public DistTXCommitMessage() {
+  }
+
+  public DistTXCommitMessage(TXId txUniqId,
+      InternalDistributedMember onBehalfOfClientMember,
+      ReplyProcessor21 processor) {
+    super(txUniqId.getUniqId(), onBehalfOfClientMember, processor);
+  }
+
+  @Override
+  public int getDSFID() {
+    return DISTTX_COMMIT_MESSAGE;
+  }
+
+  @Override
+  protected boolean operateOnTx(TXId txId, DistributionManager dm)
+      throws RemoteOperationException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId);
+    }
+
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    TXManagerImpl txMgr = cache.getTXMgr();
+    final TXStateProxy txStateProxy = txMgr.getTXState();
+    boolean commitSuccessful = false;
+    TXCommitMessage cmsg = null;
+    try {
+      // do the actual commit, only if it was not done before
+      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug(
+                  "DistTXCommitMessage.operateOnTx: found a previously committed transaction:{}",
+                  txId);
+        }
+        cmsg = txMgr.getRecentlyCompletedMessage(txId);
+        if (txMgr.isExceptionToken(cmsg)) {
+          throw txMgr.getExceptionForToken(cmsg, txId);
+        }
+        commitSuccessful = true;
+      } else {
+        // [DISTTX] TODO - Handle scenarios of no txState
+        // if no TXState was created (e.g. due to only getEntry/size operations
+        // that don't start remote TX) then ignore
+        if (txStateProxy != null) {
+          /*
+           * [DISTTX] TODO See how other exceptions are caught and send on wire,
+           * than throwing?
+           * 
+           * This can be spared since it will be programming bug
+           */
+          if (!txStateProxy.isDistTx()
+              || txStateProxy.isCreatedOnDistTxCoordinator()) {
+            throw new UnsupportedOperationInTransactionException(
+                LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                    "DistTXStateProxyImplOnDatanode", txStateProxy.getClass()
+                        .getSimpleName()));
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("DistTXCommitMessage.operateOnTx Commiting {} "
+                + " incoming entryEventList:{} coming from {} ", txId,
+                DistTXStateProxyImplOnCoordinator
+                    .printEntryEventList(this.entryStateList), this.getSender()
+                    .getId());
+          }
+          
+          // Set Member's ID to all entry states
+          String memberID = this.getSender().getId();
+          for (ArrayList<DistTxThinEntryState> esList : this.entryStateList) {
+            for (DistTxThinEntryState es : esList) {
+              es.setMemberID(memberID);
+            }
+          }
+          
+          ((DistTXStateProxyImplOnDatanode) txStateProxy)
+              .populateDistTxEntryStates(this.entryStateList);
+          txStateProxy.setCommitOnBehalfOfRemoteStub(true);
+          txMgr.commit();
+          commitSuccessful = true;
+          cmsg = txStateProxy.getCommitMessage();
+        }
+      }
+    } finally {
+      if (commitSuccessful) {
+        txMgr.removeHostedTXState(txId);
+      }
+    }
+    DistTXCommitReplyMessage.send(getSender(), getProcessorId(), cmsg,
+        getReplySender(dm));
+
+    /*
+     * return false so there isn't another reply
+     */
+    return false;
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.entryStateList = DataSerializer.readArrayList(in);
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeArrayList(entryStateList, out);
+  }
+
+  @Override
+  public boolean isTransactionDistributed() {
+    return true;
+  }
+
+  @Override
+  public boolean canStartRemoteTransaction() {
+    return true;
+  }
+
+  public void setEntryStateList(
+      ArrayList<ArrayList<DistTxThinEntryState>> entryStateList) {
+    this.entryStateList = entryStateList;
+  }
+  
+  /**
+   * This message is used for the reply to a Dist Tx Phase Two commit operation:
+   * a commit from a stub to the tx host. This is the reply to a
+   * {@link DistTXCommitMessage}.
+   * 
+   */
+  public static final class DistTXCommitReplyMessage extends
+      ReplyMessage {
+    private transient TXCommitMessage commitMessage;
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    public DistTXCommitReplyMessage() {
+    }
+
+    public DistTXCommitReplyMessage(DataInput in) throws IOException,
+        ClassNotFoundException {
+      fromData(in);
+    }
+
+    private DistTXCommitReplyMessage(int processorId,
+        TXCommitMessage val) {
+      setProcessorId(processorId);
+      this.commitMessage = val;
+    }
+
+    /** GetReplyMessages are always processed in-line */
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Return the value from the get operation, serialize it bytes as late as
+     * possible to avoid making un-neccesary byte[] copies. De-serialize those
+     * same bytes as late as possible to avoid using precious threads (aka P2P
+     * readers).
+     * 
+     * @param recipient
+     *          the origin VM that performed the get
+     * @param processorId
+     *          the processor on which the origin thread is waiting
+     * @param val
+     *          the raw value that will eventually be serialized
+     * @param replySender
+     *          distribution manager used to send the reply
+     */
+    public static void send(InternalDistributedMember recipient,
+        int processorId, TXCommitMessage val, ReplySender replySender)
+        throws RemoteOperationException {
+      Assert.assertTrue(recipient != null,
+          "DistTXCommitPhaseTwoReplyMessage NULL reply message");
+      DistTXCommitReplyMessage m = new DistTXCommitReplyMessage(
+          processorId, val);
+      m.setRecipient(recipient);
+      replySender.putOutgoing(m);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the
+     * message.
+     * 
+     * @param dm
+     *          the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DM dm, ReplyProcessor21 processor) {
+      final long startTime = getTimestamp();
+      if (logger.isTraceEnabled(LogMarker.DM)) {
+        logger
+            .trace(
+                LogMarker.DM,
+                "DistTXCommitPhaseTwoReplyMessage process invoking reply processor with processorId:{}",
+                this.processorId);
+      }
+
+      if (processor == null) {
+        if (logger.isTraceEnabled(LogMarker.DM)) {
+          logger.trace(LogMarker.DM,
+              "DistTXCommitPhaseTwoReplyMessage processor not found");
+        }
+        return;
+      }
+      processor.process(this);
+    }
+
+    @Override
+    public int getDSFID() {
+      return DISTTX_COMMIT_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeObject(commitMessage, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException,
+        ClassNotFoundException {
+      super.fromData(in);
+      this.commitMessage = (TXCommitMessage) DataSerializer.readObject(in);
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=")
+          .append(this.processorId).append(" reply to sender ")
+          .append(this.getSender());
+      return sb.toString();
+    }
+
+    public TXCommitMessage getCommitMessage() {
+      // TODO Auto-generated method stub
+      return commitMessage;
+    }
+  }
+
+  /**
+   * Reply processor which collects all CommitReplyExceptions for Dist Tx and emits
+   * a detailed failure exception if problems occur
+   * 
+   * @see TXCommitMessage.CommitReplyProcessor
+   * 
+   * [DISTTX] TODO see if need ReliableReplyProcessor21? departed members?
+   */
+  public static final class DistTxCommitReplyProcessor extends ReplyProcessor21 {
+    private HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap;
+    private Map<DistributedMember, TXCommitMessage> commitResponseMap;
+    private transient TXId txIdent = null;
+    
+    public DistTxCommitReplyProcessor(TXId txUniqId, DM dm, Set initMembers,
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      super(dm, initMembers);
+      this.msgMap = msgMap;
+      // [DISTTX] TODO Do we need synchronised map?
+      this.commitResponseMap = Collections
+          .synchronizedMap(new HashMap<DistributedMember, TXCommitMessage>());
+      this.txIdent = txUniqId;
+    }
+    
+    @Override
+    public void process(DistributionMessage msg) {
+      if (msg instanceof DistTXCommitReplyMessage) {
+        DistTXCommitReplyMessage reply = (DistTXCommitReplyMessage) msg;
+        this.commitResponseMap.put(reply.getSender(), reply.getCommitMessage());
+      }
+      super.process(msg);
+    }
+  
+    public void waitForPrecommitCompletion() {
+      try {
+        waitForRepliesUninterruptibly();
+      }
+      catch (DistTxCommitExceptionCollectingException e) {
+        e.handlePotentialCommitFailure(msgMap);
+      }
+    }
+
+    @Override
+    protected void processException(DistributionMessage msg,
+        ReplyException ex) {
+      if (msg instanceof ReplyMessage) {
+        synchronized(this) {
+          if (this.exception == null) {
+            // Exception Container
+            this.exception = new DistTxCommitExceptionCollectingException(txIdent);
+          }
+          DistTxCommitExceptionCollectingException cce = (DistTxCommitExceptionCollectingException) this.exception;
+          if (ex instanceof CommitReplyException) {
+            CommitReplyException cre = (CommitReplyException) ex;
+            cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions());
+          } else {
+            cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex));
+          }
+        }
+      }
+    }
+
+    @Override
+    protected boolean stopBecauseOfExceptions() {
+      return false;
+    }
+    
+    public Set getCacheClosedMembers() {
+      if (this.exception != null) {
+        DistTxCommitExceptionCollectingException cce = (DistTxCommitExceptionCollectingException) this.exception;
+        return cce.getCacheClosedMembers();
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      if (this.exception != null) {
+        DistTxCommitExceptionCollectingException cce = (DistTxCommitExceptionCollectingException) this.exception;
+        return cce.getRegionDestroyedMembers(regionFullPath);
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    
+    public Map<DistributedMember, TXCommitMessage> getCommitResponseMap() {
+      return commitResponseMap;
+    }
+  }
+
+  /**
+   * An Exception that collects many remote CommitExceptions
+   * 
+   * @see TXCommitMessage.CommitExceptionCollectingException
+   */
+  public static class DistTxCommitExceptionCollectingException extends
+      ReplyException {
+    private static final long serialVersionUID = -2681117727592137893L;
+    /** Set of members that threw CacheClosedExceptions */
+    private final Set<InternalDistributedMember> cacheExceptions;
+    /** key=region path, value=Set of members */
+    private final Map<String, Set<InternalDistributedMember>> regionExceptions;
+    /** List of exceptions that were unexpected and caused the tx to fail */
+    private final Map fatalExceptions;
+
+    private final TXId id;
+
+    /*
+     * [DISTTX] TODO Actually handle exceptions like commit conflict, primary bucket moved, etc
+     */
+    public DistTxCommitExceptionCollectingException(TXId txIdent) {
+      this.cacheExceptions = new HashSet<InternalDistributedMember>();
+      this.regionExceptions = new HashMap<String, Set<InternalDistributedMember>>();
+      this.fatalExceptions = new HashMap();
+      this.id = txIdent;
+    }
+
+    /**
+     * Determine if the commit processing was incomplete, if so throw a detailed
+     * exception indicating the source of the problem
+     * 
+     * @param msgMap
+     */
+    public void handlePotentialCommitFailure(
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      if (fatalExceptions.size() > 0) {
+        StringBuffer errorMessage = new StringBuffer(
+            "Incomplete commit of transaction ").append(id).append(
+            ".  Caused by the following exceptions: ");
+        for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
+          Map.Entry me = (Map.Entry) i.next();
+          DistributedMember mem = (DistributedMember) me.getKey();
+          errorMessage.append(" From member: ").append(mem).append(" ");
+          List exceptions = (List) me.getValue();
+          for (Iterator ei = exceptions.iterator(); ei.hasNext();) {
+            Exception e = (Exception) ei.next();
+            errorMessage.append(e);
+            for (StackTraceElement ste : e.getStackTrace()) {
+              errorMessage.append("\n\tat ").append(ste);
+            }
+            if (ei.hasNext()) {
+              errorMessage.append("\nAND\n");
+            }
+          }
+          errorMessage.append(".");
+        }
+        throw new CommitIncompleteException(errorMessage.toString());
+      }
+
+      /* [DISTTX] TODO Not Sure if required */
+      // Mark any persistent members as offline
+      // handleClosedMembers(msgMap);
+      // handleRegionDestroyed(msgMap);
+    }
+
+    public Set<InternalDistributedMember> getCacheClosedMembers() {
+      return this.cacheExceptions;
+    }
+
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      Set members = (Set) this.regionExceptions.get(regionFullPath);
+      if (members == null) {
+        members = Collections.EMPTY_SET;
+      }
+      return members;
+    }
+
+    /**
+     * Protected by (this)
+     * 
+     * @param member
+     * @param exceptions
+     */
+    public void addExceptionsFromMember(InternalDistributedMember member,
+        Set exceptions) {
+      for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
+        Exception ex = (Exception) iter.next();
+        if (ex instanceof CancelException) {
+          cacheExceptions.add(member);
+        } else if (ex instanceof RegionDestroyedException) {
+          String r = ((RegionDestroyedException) ex).getRegionFullPath();
+          Set<InternalDistributedMember> members = regionExceptions.get(r);
+          if (members == null) {
+            members = new HashSet<InternalDistributedMember>();
+            regionExceptions.put(r, members);
+          }
+          members.add(member);
+        } else {
+          List el = (List) this.fatalExceptions.get(member);
+          if (el == null) {
+            el = new ArrayList(2);
+            this.fatalExceptions.put(member, el);
+          }
+          el.add(ex);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCoordinatorInterface.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCoordinatorInterface.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCoordinatorInterface.java
new file mode 100644
index 0000000..be88e41
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXCoordinatorInterface.java
@@ -0,0 +1,64 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+
+/**
+ * [DISTTX] For Distributed Transaction
+ * 
+ * An entity that works as stub for DistTX on Coordinator.
+ * 
+ * @author vivekb
+ */
+public interface DistTXCoordinatorInterface extends TXStateInterface {
+  /**
+   * Response for Precommit
+   */
+  public boolean getPreCommitResponse()
+      throws UnsupportedOperationInTransactionException;
+
+  /**
+   * Response for Rollback
+   */
+  public boolean getRollbackResponse()
+      throws UnsupportedOperationInTransactionException;
+
+  public ArrayList<DistTxEntryEvent> getPrimaryTransactionalOperations()
+      throws UnsupportedOperationInTransactionException;
+
+  public void addSecondaryTransactionalOperations(DistTxEntryEvent dtop)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void gatherAffectedRegions(HashSet<LocalRegion> regionSet,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void gatherAffectedRegionsName(
+      TreeSet<String> sortedRegionName,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException;
+  
+  public void finalCleanup();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXPrecommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXPrecommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXPrecommitMessage.java
new file mode 100644
index 0000000..5fe7ba0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXPrecommitMessage.java
@@ -0,0 +1,535 @@
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CommitIncompleteException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.ReplySender;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
+import com.gemstone.gemfire.internal.cache.locks.TXLockService;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+
+/**
+ * @author vivekb
+ * 
+ */
+public final class DistTXPrecommitMessage extends TXMessage {
+
+  private static final Logger logger = LogService.getLogger();
+  ArrayList<DistTxEntryEvent> secondaryTransactionalOperations;
+
+  /** for deserialization */
+  public DistTXPrecommitMessage() {
+  }
+
+  public DistTXPrecommitMessage(TXId txUniqId,
+      InternalDistributedMember onBehalfOfClientMember,
+      ReplyProcessor21 processor) {
+    super(txUniqId.getUniqId(), onBehalfOfClientMember, processor);
+  }
+
+  @Override
+  public int getDSFID() {
+    return DISTTX_PRE_COMMIT_MESSAGE;
+  }
+
+  @Override
+  protected boolean operateOnTx(TXId txId, DistributionManager dm)
+      throws RemoteOperationException {
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    TXManagerImpl txMgr = cache.getTXMgr();
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "DistTXPrecommitMessage.operateOnTx: Tx {} with Secondaries List {}",
+          txId, this.secondaryTransactionalOperations);
+    }
+
+    // should not be commited before
+    assert (!txMgr.isHostedTxRecentlyCompleted(txId));
+    // @see TXCommitMessage.process(DistributionManager)
+    TXLockService.createDTLS(); // fix bug 38843; no-op if already created
+    final TXStateProxy txStateProxy = txMgr.getTXState();
+    boolean precommitSuccess = true;
+    TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap = new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+    // [DISTTX] TODO - Test valid scenarios of null txState
+    // if no TXState was created (e.g. due to only getEntry/size operations
+    // that don't start remote TX) then ignore
+    if (txStateProxy != null) {
+      if (!txStateProxy.isDistTx() || !txStateProxy.isTxStateProxy()
+          || txStateProxy.isCreatedOnDistTxCoordinator()) {
+        throw new UnsupportedOperationInTransactionException(
+            LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+                "DistTXStateProxyImplOnDatanode", txStateProxy.getClass()
+                    .getSimpleName()));
+      }
+
+      ((DistTXStateProxyImplOnDatanode) txStateProxy).setPreCommitMessage(this);
+
+      /*
+       * Perform precommit
+       * 
+       * [DISTTX] Handle different exceptions here
+       */
+      txMgr.precommit();
+      precommitSuccess = ((DistTXStateProxyImplOnDatanode) txStateProxy)
+          .getPreCommitResponse();
+      if (precommitSuccess) {
+        precommitSuccess = ((DistTXStateProxyImplOnDatanode) txStateProxy)
+            .populateDistTxEntryStateList(entryStateSortedMap);
+        if (!precommitSuccess) {
+          entryStateSortedMap.clear();
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug(
+                    "DistTXPrecommitMessage.operateOnTx: Tx {} Failed while creating response",
+                    txId);
+          }
+        }
+      }
+      else {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug(
+                  "DistTXPrecommitMessage.operateOnTx: Tx {} Failed while applying changes for replicates",
+                  txId);
+        }
+      }
+    }
+
+    // Send Response : Send false if conflict
+    DistTxPrecommitResponse finalResponse = new DistTxPrecommitResponse(
+        precommitSuccess, new ArrayList<ArrayList<DistTxThinEntryState>>(
+            entryStateSortedMap.values()));
+    DistTXPrecommitReplyMessage.send(getSender(), getProcessorId(),
+        finalResponse, getReplySender(dm));
+
+    // return false so there isn't another reply
+    return false;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeArrayList(
+        (ArrayList<?>) secondaryTransactionalOperations, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.secondaryTransactionalOperations = DataSerializer.readArrayList(in);
+  }
+
+  @Override
+  public boolean isTransactionDistributed() {
+    return true;
+  }
+
+  @Override
+  public boolean canStartRemoteTransaction() {
+    return true;
+  }
+
+  public ArrayList<DistTxEntryEvent> getSecondaryTransactionalOperations() {
+    return secondaryTransactionalOperations;
+  }
+
+  public void setSecondaryTransactionalOperations(
+      ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) {
+    this.secondaryTransactionalOperations = secondaryTransactionalOperations;
+  }
+
+  /**
+   * This is the reply to a {@link DistTXPrecommitMessage}.
+   */
+  public static final class DistTXPrecommitReplyMessage extends
+      ReplyMessage {
+    private transient DistTxPrecommitResponse commitResponse;
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    public DistTXPrecommitReplyMessage() {
+    }
+
+    public DistTXPrecommitReplyMessage(DataInput in) throws IOException,
+        ClassNotFoundException {
+      fromData(in);
+    }
+
+    private DistTXPrecommitReplyMessage(int processorId, DistTxPrecommitResponse val) {
+      setProcessorId(processorId);
+      this.commitResponse = val;
+    }
+
+    /** GetReplyMessages are always processed in-line */
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Return the value from the get operation, serialize it bytes as late as
+     * possible to avoid making un-neccesary byte[] copies. De-serialize those
+     * same bytes as late as possible to avoid using precious threads (aka P2P
+     * readers).
+     * 
+     * @param recipient
+     *          the origin VM that performed the get
+     * @param processorId
+     *          the processor on which the origin thread is waiting
+     * @param val
+     *          the raw value that will eventually be serialized
+     * @param replySender
+     *          distribution manager used to send the reply
+     */
+    public static void send(InternalDistributedMember recipient,
+        int processorId, DistTxPrecommitResponse val, ReplySender replySender)
+        throws RemoteOperationException {
+      Assert.assertTrue(recipient != null,
+          "DistTXPhaseOneCommitReplyMessage NULL reply message");
+      DistTXPrecommitReplyMessage m = new DistTXPrecommitReplyMessage(
+          processorId, val);
+      m.setRecipient(recipient);
+      replySender.putOutgoing(m);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the
+     * message.
+     * 
+     * @param dm
+     *          the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DM dm, ReplyProcessor21 processor) {
+      final long startTime = getTimestamp();
+      if (logger.isTraceEnabled(LogMarker.DM)) {
+        logger
+            .trace(
+                LogMarker.DM,
+                "DistTXPhaseOneCommitReplyMessage process invoking reply processor with processorId:{}",
+                this.processorId);
+      }
+
+      if (processor == null) {
+        if (logger.isTraceEnabled(LogMarker.DM)) {
+          logger.trace(LogMarker.DM,
+              "DistTXPhaseOneCommitReplyMessage processor not found");
+        }
+        return;
+      }
+      processor.process(this);
+    }
+
+    @Override
+    public int getDSFID() {
+      return DISTTX_PRE_COMMIT_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeObject(commitResponse, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException,
+        ClassNotFoundException {
+      super.fromData(in);
+      this.commitResponse = (DistTxPrecommitResponse) DataSerializer
+          .readObject(in);
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("DistTXPhaseOneCommitReplyMessage").append("processorid=")
+          .append(this.processorId).append(" reply to sender ")
+          .append(this.getSender());
+      return sb.toString();
+    }
+
+    public DistTxPrecommitResponse getCommitResponse() {
+      return commitResponse;
+    }
+  }
+
+  /**
+   * Reply processor which collects all CommitReplyExceptions for Dist Tx and emits
+   * a detailed failure exception if problems occur
+   * 
+   * @see TXCommitMessage.CommitReplyProcessor
+   * 
+   * [DISTTX] TODO see if need ReliableReplyProcessor21? departed members?
+   */
+  public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 {
+    private HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap;
+    private Map<DistributedMember, DistTxPrecommitResponse> commitResponseMap;
+    private transient TXId txIdent = null;
+    
+    public DistTxPrecommitReplyProcessor(TXId txUniqId, DM dm, Set initMembers,
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      super(dm, initMembers);
+      this.msgMap = msgMap;
+      // [DISTTX] TODO Do we need synchronised map?
+      this.commitResponseMap = Collections
+          .synchronizedMap(new HashMap<DistributedMember, DistTxPrecommitResponse>());
+      this.txIdent = txUniqId;
+    }
+    
+    @Override
+    public void process(DistributionMessage msg) {
+      if (msg instanceof DistTXPrecommitReplyMessage) {
+        DistTXPrecommitReplyMessage reply = (DistTXPrecommitReplyMessage) msg;
+        this.commitResponseMap.put(reply.getSender(), reply.getCommitResponse());
+      }
+      super.process(msg);
+    }
+  
+    public void waitForPrecommitCompletion() {
+      try {
+        waitForRepliesUninterruptibly();
+      }
+      catch (DistTxPrecommitExceptionCollectingException e) {
+        e.handlePotentialCommitFailure(msgMap);
+      }
+    }
+
+    @Override
+    protected void processException(DistributionMessage msg,
+        ReplyException ex) {
+      if (msg instanceof ReplyMessage) {
+        synchronized(this) {
+          if (this.exception == null) {
+            // Exception Container
+            this.exception = new DistTxPrecommitExceptionCollectingException(txIdent);
+          }
+          DistTxPrecommitExceptionCollectingException cce = (DistTxPrecommitExceptionCollectingException) this.exception;
+          if (ex instanceof CommitReplyException) {
+            CommitReplyException cre = (CommitReplyException) ex;
+            cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions());
+          } else {
+            cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex));
+          }
+        }
+      }
+    }
+
+    @Override
+    protected boolean stopBecauseOfExceptions() {
+      return false;
+    }
+    
+    public Set getCacheClosedMembers() {
+      if (this.exception != null) {
+        DistTxPrecommitExceptionCollectingException cce = (DistTxPrecommitExceptionCollectingException) this.exception;
+        return cce.getCacheClosedMembers();
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      if (this.exception != null) {
+        DistTxPrecommitExceptionCollectingException cce = (DistTxPrecommitExceptionCollectingException) this.exception;
+        return cce.getRegionDestroyedMembers(regionFullPath);
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    
+    public Map<DistributedMember, DistTxPrecommitResponse> getCommitResponseMap() {
+      return commitResponseMap;
+    }
+  }
+
+  /**
+   * An Exception that collects many remote CommitExceptions
+   * 
+   * @see TXCommitMessage.CommitExceptionCollectingException
+   */
+  public static class DistTxPrecommitExceptionCollectingException extends
+      ReplyException {
+    private static final long serialVersionUID = -2681117727592137893L;
+    /** Set of members that threw CacheClosedExceptions */
+    private final Set<InternalDistributedMember> cacheExceptions;
+    /** key=region path, value=Set of members */
+    private final Map<String, Set<InternalDistributedMember>> regionExceptions;
+    /** List of exceptions that were unexpected and caused the tx to fail */
+    private final Map fatalExceptions;
+
+    private final TXId id;
+
+    /*
+     * [DISTTX] TODO Actually handle exceptions like commit conflict, primary bucket moved, etc
+     */
+    public DistTxPrecommitExceptionCollectingException(TXId txIdent) {
+      this.cacheExceptions = new HashSet<InternalDistributedMember>();
+      this.regionExceptions = new HashMap<String, Set<InternalDistributedMember>>();
+      this.fatalExceptions = new HashMap();
+      this.id = txIdent;
+    }
+
+    /**
+     * Determine if the commit processing was incomplete, if so throw a detailed
+     * exception indicating the source of the problem
+     * 
+     * @param msgMap
+     */
+    public void handlePotentialCommitFailure(
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      if (fatalExceptions.size() > 0) {
+        StringBuffer errorMessage = new StringBuffer(
+            "Incomplete commit of transaction ").append(id).append(
+            ".  Caused by the following exceptions: ");
+        for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
+          Map.Entry me = (Map.Entry) i.next();
+          DistributedMember mem = (DistributedMember) me.getKey();
+          errorMessage.append(" From member: ").append(mem).append(" ");
+          List exceptions = (List) me.getValue();
+          for (Iterator ei = exceptions.iterator(); ei.hasNext();) {
+            Exception e = (Exception) ei.next();
+            errorMessage.append(e);
+            for (StackTraceElement ste : e.getStackTrace()) {
+              errorMessage.append("\n\tat ").append(ste);
+            }
+            if (ei.hasNext()) {
+              errorMessage.append("\nAND\n");
+            }
+          }
+          errorMessage.append(".");
+        }
+        throw new CommitIncompleteException(errorMessage.toString());
+      }
+
+      /* [DISTTX] TODO Not Sure if required */
+      // Mark any persistent members as offline
+      // handleClosedMembers(msgMap);
+      // handleRegionDestroyed(msgMap);
+    }
+
+    public Set<InternalDistributedMember> getCacheClosedMembers() {
+      return this.cacheExceptions;
+    }
+
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      Set members = (Set) this.regionExceptions.get(regionFullPath);
+      if (members == null) {
+        members = Collections.EMPTY_SET;
+      }
+      return members;
+    }
+
+    /**
+     * Protected by (this)
+     * 
+     * @param member
+     * @param exceptions
+     */
+    public void addExceptionsFromMember(InternalDistributedMember member,
+        Set exceptions) {
+      for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
+        Exception ex = (Exception) iter.next();
+        if (ex instanceof CancelException) {
+          cacheExceptions.add(member);
+        } else if (ex instanceof RegionDestroyedException) {
+          String r = ((RegionDestroyedException) ex).getRegionFullPath();
+          Set<InternalDistributedMember> members = regionExceptions.get(r);
+          if (members == null) {
+            members = new HashSet();
+            regionExceptions.put(r, members);
+          }
+          members.add(member);
+        } else {
+          List el = (List) this.fatalExceptions.get(member);
+          if (el == null) {
+            el = new ArrayList(2);
+            this.fatalExceptions.put(member, el);
+          }
+          el.add(ex);
+        }
+      }
+    }
+  }
+  
+  public static final class DistTxPrecommitResponse implements
+      DataSerializableFixedID {
+    private transient Boolean commitState;
+    private transient ArrayList<ArrayList<DistTxThinEntryState>> distTxEventList;
+
+    // Default constructor for serialisation
+    public DistTxPrecommitResponse() {
+    }
+
+    public DistTxPrecommitResponse(boolean precommitSuccess,
+        ArrayList<ArrayList<DistTxThinEntryState>> eventList) {
+      this.commitState = precommitSuccess;
+      this.distTxEventList = eventList;
+    }
+
+    @Override
+    public Version[] getSerializationVersions() {
+      return null;
+    }
+
+    @Override
+    public int getDSFID() {
+      return DIST_TX_PRE_COMMIT_RESPONSE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeBoolean(commitState, out);
+      DataSerializer.writeArrayList(distTxEventList, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException,
+        ClassNotFoundException {
+      this.commitState = DataSerializer.readBoolean(in);
+      this.distTxEventList = DataSerializer.readArrayList(in);
+    }
+
+    public Boolean getCommitState() {
+      return commitState;
+    }
+
+    public ArrayList<ArrayList<DistTxThinEntryState>> getDistTxEntryEventList() {
+      return distTxEventList;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXRollbackMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXRollbackMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXRollbackMessage.java
new file mode 100644
index 0000000..605f3bb
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXRollbackMessage.java
@@ -0,0 +1,494 @@
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CommitIncompleteException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
+import com.gemstone.gemfire.distributed.internal.ReplyMessage;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.ReplySender;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.RemoteOperationMessage.RemoteOperationResponse;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+
+/**
+ * @author vivekb
+ * 
+ */
+public final class DistTXRollbackMessage extends TXMessage {
+
+  private static final Logger logger = LogService.getLogger();
+
+  /** for deserialization */
+  public DistTXRollbackMessage() {
+  }
+
+  public DistTXRollbackMessage(TXId txUniqId,
+      InternalDistributedMember onBehalfOfClientMember,
+      ReplyProcessor21 processor) {
+    super(txUniqId.getUniqId(), onBehalfOfClientMember, processor);
+  }
+
+  @Override
+  public int getDSFID() {
+    return DISTTX_ROLLBACK_MESSAGE;
+  }
+
+  @Override
+  protected boolean operateOnTx(TXId txId, DistributionManager dm)
+      throws RemoteOperationException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Dist TX: Rollback: {}", txId);
+    }
+
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    TXManagerImpl txMgr = cache.getTXMgr();
+    final TXStateProxy txState = txMgr.getTXState();
+    boolean rollbackSuccessful = false;
+    try {
+      // do the actual rollback, only if it was not done before
+      if (txMgr.isHostedTxRecentlyCompleted(txId)) {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug(
+                  "DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}",
+                  txId);
+        }
+        // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId);
+        // if (txMgr.isExceptionToken(cmsg)) {
+        // throw txMgr.getExceptionForToken(cmsg, txId);
+        // }
+      } else if (txState != null) {
+        // [DISTTX] TODO - Handle scenarios of no txState
+        // if no TXState was created (e.g. due to only getEntry/size operations
+        // that don't start remote TX) then ignore
+        txMgr.rollback();
+        rollbackSuccessful = true;
+      }
+    } finally {
+      txMgr.removeHostedTXState(txId);
+    }
+
+    DistTXRollbackReplyMessage.send(getSender(), getProcessorId(),
+        rollbackSuccessful, getReplySender(dm));
+
+    /*
+     * return false so there isn't another reply
+     */
+    return false;
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    // more data
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    // more data
+  }
+
+  @Override
+  public boolean isTransactionDistributed() {
+    return true;
+  }
+
+  @Override
+  public boolean canStartRemoteTransaction() {
+    return true;
+  }
+
+  /**
+   * This is the reply to a {@link DistTXRollbackMessage}.
+   */
+  public static final class DistTXRollbackReplyMessage extends ReplyMessage {
+    private transient Boolean rollbackState;
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    public DistTXRollbackReplyMessage() {
+    }
+
+    public DistTXRollbackReplyMessage(DataInput in) throws IOException,
+        ClassNotFoundException {
+      fromData(in);
+    }
+
+    private DistTXRollbackReplyMessage(int processorId, Boolean val) {
+      setProcessorId(processorId);
+      this.rollbackState = val;
+    }
+
+    /** GetReplyMessages are always processed in-line */
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Return the value from the get operation, serialize it bytes as late as
+     * possible to avoid making un-neccesary byte[] copies. De-serialize those
+     * same bytes as late as possible to avoid using precious threads (aka P2P
+     * readers).
+     * 
+     * @param recipient
+     *          the origin VM that performed the get
+     * @param processorId
+     *          the processor on which the origin thread is waiting
+     * @param val
+     *          the raw value that will eventually be serialized
+     * @param replySender
+     *          distribution manager used to send the reply
+     */
+    public static void send(InternalDistributedMember recipient,
+        int processorId, Boolean val, ReplySender replySender)
+        throws RemoteOperationException {
+      Assert.assertTrue(recipient != null,
+          "DistTXRollbackReplyMessage NULL reply message");
+      DistTXRollbackReplyMessage m = new DistTXRollbackReplyMessage(
+          processorId, val);
+      m.setRecipient(recipient);
+      replySender.putOutgoing(m);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the
+     * message.
+     * 
+     * @param dm
+     *          the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DM dm, ReplyProcessor21 processor) {
+      final long startTime = getTimestamp();
+      if (logger.isTraceEnabled(LogMarker.DM)) {
+        logger
+            .trace(
+                LogMarker.DM,
+                "DistTXRollbackReplyMessage process invoking reply processor with processorId:{}",
+                this.processorId);
+      }
+
+      if (processor == null) {
+        if (logger.isTraceEnabled(LogMarker.DM)) {
+          logger.trace(LogMarker.DM,
+              "DistTXRollbackReplyMessage processor not found");
+        }
+        return;
+      }
+      processor.process(this);
+    }
+
+    @Override
+    public int getDSFID() {
+      return DISTTX_ROLLBACK_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeBoolean(this.rollbackState, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException,
+        ClassNotFoundException {
+      super.fromData(in);
+      this.rollbackState = DataSerializer.readBoolean(in);
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("DistTXRollbackReplyMessage ").append("processorid=")
+          .append(this.processorId).append(" reply to sender ")
+          .append(this.getSender());
+      return sb.toString();
+    }
+
+    public Boolean getRollbackState() {
+      return rollbackState;
+    }
+  }
+
+  /**
+   * A processor to capture the value returned by
+   * {@link DistTXRollbackMessage.DistTXCommitReplyMessage}
+   * 
+   */
+  public static class DistTXRollbackResponse extends RemoteOperationResponse {
+    private volatile Boolean rollbackState;
+    private volatile long start;
+
+    public DistTXRollbackResponse(InternalDistributedSystem ds, Set recipients) {
+      super(ds, recipients, true);
+    }
+
+    public Boolean getRollbackState() {
+      return rollbackState;
+    }
+
+    @Override
+    public void process(DistributionMessage msg) {
+      if (DistributionStats.enableClockStats) {
+        this.start = DistributionStats.getStatTime();
+      }
+      if (msg instanceof DistTXRollbackReplyMessage) {
+        DistTXRollbackReplyMessage reply = (DistTXRollbackReplyMessage) msg;
+        // De-serialization needs to occur in the requesting thread, not a P2P
+        // thread
+        // (or some other limited resource)
+        this.rollbackState = reply.getRollbackState();
+      }
+      super.process(msg);
+    }
+
+    /**
+     * @return Object associated with the key that was sent in the get message
+     */
+    public Boolean waitForResponse() throws RemoteOperationException {
+      try {
+        // waitForRepliesUninterruptibly();
+        waitForCacheException();
+        if (DistributionStats.enableClockStats) {
+          getDistributionManager().getStats().incReplyHandOffTime(this.start);
+        }
+      } catch (RemoteOperationException e) {
+        final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing";
+        logger.debug(msg, e);
+        throw e;
+      } catch (TransactionDataNotColocatedException e) {
+        // Throw this up to user!
+        throw e;
+      }
+      return rollbackState;
+    }
+  }
+  
+  /**
+   * Reply processor which collects all CommitReplyExceptions for Dist Tx and emits
+   * a detailed failure exception if problems occur
+   * 
+   * @see TXCommitMessage.CommitReplyProcessor
+   * 
+   * [DISTTX] TODO see if need ReliableReplyProcessor21? departed members?
+   */
+  public static final class DistTxRollbackReplyProcessor extends ReplyProcessor21 {
+    private HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap;
+    private Map<DistributedMember, Boolean> rollbackResponseMap;
+    private transient TXId txIdent = null;
+    
+    public DistTxRollbackReplyProcessor(TXId txUniqId, DM dm, Set initMembers,
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      super(dm, initMembers);
+      this.msgMap = msgMap;
+      // [DISTTX] TODO Do we need synchronised map?
+      this.rollbackResponseMap = Collections
+          .synchronizedMap(new HashMap<DistributedMember, Boolean>());
+      this.txIdent = txUniqId;
+    }
+    
+    @Override
+    public void process(DistributionMessage msg) {
+      if (msg instanceof DistTXRollbackReplyMessage) {
+        DistTXRollbackReplyMessage reply = (DistTXRollbackReplyMessage) msg;
+        this.rollbackResponseMap.put(reply.getSender(), reply.getRollbackState());
+      }
+      super.process(msg);
+    }
+  
+    public void waitForPrecommitCompletion() {
+      try {
+        waitForRepliesUninterruptibly();
+      }
+      catch (DistTxRollbackExceptionCollectingException e) {
+        e.handlePotentialCommitFailure(msgMap);
+      }
+    }
+
+    @Override
+    protected void processException(DistributionMessage msg,
+        ReplyException ex) {
+      if (msg instanceof ReplyMessage) {
+        synchronized(this) {
+          if (this.exception == null) {
+            // Exception Container
+            this.exception = new DistTxRollbackExceptionCollectingException(txIdent);
+          }
+          DistTxRollbackExceptionCollectingException cce = (DistTxRollbackExceptionCollectingException) this.exception;
+          if (ex instanceof CommitReplyException) {
+            CommitReplyException cre = (CommitReplyException) ex;
+            cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions());
+          } else {
+            cce.addExceptionsFromMember(msg.getSender(), Collections.singleton(ex));
+          }
+        }
+      }
+    }
+
+    @Override
+    protected boolean stopBecauseOfExceptions() {
+      return false;
+    }
+    
+    public Set getCacheClosedMembers() {
+      if (this.exception != null) {
+        DistTxRollbackExceptionCollectingException cce = (DistTxRollbackExceptionCollectingException) this.exception;
+        return cce.getCacheClosedMembers();
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      if (this.exception != null) {
+        DistTxRollbackExceptionCollectingException cce = (DistTxRollbackExceptionCollectingException) this.exception;
+        return cce.getRegionDestroyedMembers(regionFullPath);
+      } else {
+        return Collections.EMPTY_SET;
+      }
+    }
+    
+    public Map<DistributedMember, Boolean> getRollbackResponseMap() {
+      return rollbackResponseMap;
+    }
+  }
+
+  /**
+   * An Exception that collects many remote CommitExceptions
+   * 
+   * @see TXCommitMessage.CommitExceptionCollectingException
+   */
+  public static class DistTxRollbackExceptionCollectingException extends
+      ReplyException {
+    private static final long serialVersionUID = -2681117727592137893L;
+    /** Set of members that threw CacheClosedExceptions */
+    private final Set<InternalDistributedMember> cacheExceptions;
+    /** key=region path, value=Set of members */
+    private final Map<String, Set<InternalDistributedMember>> regionExceptions;
+    /** List of exceptions that were unexpected and caused the tx to fail */
+    private final Map fatalExceptions;
+
+    private final TXId id;
+
+    /*
+     * [DISTTX] TODO Actually handle exceptions like commit conflict, primary bucket moved, etc
+     */
+    public DistTxRollbackExceptionCollectingException(TXId txIdent) {
+      this.cacheExceptions = new HashSet<InternalDistributedMember>();
+      this.regionExceptions = new HashMap<String, Set<InternalDistributedMember>>();
+      this.fatalExceptions = new HashMap();
+      this.id = txIdent;
+    }
+
+    /**
+     * Determine if the commit processing was incomplete, if so throw a detailed
+     * exception indicating the source of the problem
+     * 
+     * @param msgMap
+     */
+    public void handlePotentialCommitFailure(
+        HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+      if (fatalExceptions.size() > 0) {
+        StringBuffer errorMessage = new StringBuffer(
+            "Incomplete commit of transaction ").append(id).append(
+            ".  Caused by the following exceptions: ");
+        for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
+          Map.Entry me = (Map.Entry) i.next();
+          DistributedMember mem = (DistributedMember) me.getKey();
+          errorMessage.append(" From member: ").append(mem).append(" ");
+          List exceptions = (List) me.getValue();
+          for (Iterator ei = exceptions.iterator(); ei.hasNext();) {
+            Exception e = (Exception) ei.next();
+            errorMessage.append(e);
+            for (StackTraceElement ste : e.getStackTrace()) {
+              errorMessage.append("\n\tat ").append(ste);
+            }
+            if (ei.hasNext()) {
+              errorMessage.append("\nAND\n");
+            }
+          }
+          errorMessage.append(".");
+        }
+        throw new CommitIncompleteException(errorMessage.toString());
+      }
+
+      /* [DISTTX] TODO Not Sure if required */
+      // Mark any persistent members as offline
+      // handleClosedMembers(msgMap);
+      // handleRegionDestroyed(msgMap);
+    }
+
+    public Set<InternalDistributedMember> getCacheClosedMembers() {
+      return this.cacheExceptions;
+    }
+
+    public Set getRegionDestroyedMembers(String regionFullPath) {
+      Set members = (Set) this.regionExceptions.get(regionFullPath);
+      if (members == null) {
+        members = Collections.EMPTY_SET;
+      }
+      return members;
+    }
+
+    /**
+     * Protected by (this)
+     * 
+     * @param member
+     * @param exceptions
+     */
+    public void addExceptionsFromMember(InternalDistributedMember member,
+        Set exceptions) {
+      for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
+        Exception ex = (Exception) iter.next();
+        if (ex instanceof CancelException) {
+          cacheExceptions.add(member);
+        } else if (ex instanceof RegionDestroyedException) {
+          String r = ((RegionDestroyedException) ex).getRegionFullPath();
+          Set<InternalDistributedMember> members = regionExceptions.get(r);
+          if (members == null) {
+            members = new HashSet();
+            regionExceptions.put(r, members);
+          }
+          members.add(member);
+        } else {
+          List el = (List) this.fatalExceptions.get(member);
+          if (el == null) {
+            el = new ArrayList(2);
+            this.fatalExceptions.put(member, el);
+          }
+          el.add(ex);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
new file mode 100644
index 0000000..ddc1a7f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
@@ -0,0 +1,665 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.DiskAccessException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
+import com.gemstone.gemfire.cache.TransactionWriter;
+import com.gemstone.gemfire.cache.TransactionWriterException;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState;
+import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.DistTxKeyInfo;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * TxState on a datanode VM
+ * 
+ * @author vivekb
+ * 
+ */
+public class DistTXState extends TXState {
+
+  private boolean updatingTxStateDuringPreCommit = false;
+
+  public DistTXState(TXStateProxy proxy, boolean onBehalfOfRemoteStub) {
+    super(proxy, onBehalfOfRemoteStub);
+  }
+
+  @Override
+  protected void cleanup() {
+    super.cleanup();
+    // Do nothing for now
+  }
+
+  /*
+   * If this is a primary member,
+   * for each entry in TXState, generate next region version
+   * and store in the entry.
+   */
+  public void updateRegionVersions() {
+
+    Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions
+        .entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<LocalRegion, TXRegionState> me = it.next();
+      LocalRegion r = me.getKey();
+      TXRegionState txrs = me.getValue();
+      
+      // Generate next region version only on the primary
+      if (!txrs.isCreatedDuringCommit()) {
+        try {
+          Set entries = txrs.getEntryKeys();
+          if (!entries.isEmpty()) {
+            Iterator entryIt = entries.iterator();
+            while (entryIt.hasNext()) {
+              Object key = entryIt.next();
+              TXEntryState txes = txrs.getTXEntryState(key);
+              RegionVersionVector rvv = r.getVersionVector();
+              if (rvv != null) {
+                long v = rvv.getNextVersion();
+                //txes.setNextRegionVersion(v);
+                txes.getDistTxEntryStates().setRegionVersion(v);
+                if (logger.isDebugEnabled()) {
+                  logger.debug("Set next region version to "+ v + " for region="+r.getName() + "in TXEntryState for key"+key );  
+                }
+              }
+            }
+          }
+        } catch (DiskAccessException dae) {
+          r.handleDiskAccessException(dae);
+          throw dae;
+        }
+      }
+    }
+  }  
+  
+  /*
+   * Iterate through all changes and for those changes for which
+   * this member hosts a primary bucket, generate a tail key and store in
+   * the TXEntryState.  From there it is expected to be carried over
+   * to the secondaries in phase-2 commit.
+   * In phase-2 commit, the both the primary and secondaries should
+   * use this tail key to enqueue into parallel queues.
+   */
+  public void generateTailKeysForParallelDispatcherEvents() {
+    Iterator<Map.Entry<LocalRegion, TXRegionState>> it = this.regions
+        .entrySet().iterator();
+
+    while (it.hasNext()) {
+      Map.Entry<LocalRegion, TXRegionState> me = it.next();
+      LocalRegion r = me.getKey();
+      TXRegionState txrs = me.getValue();
+  
+      LocalRegion region = txrs.getRegion();
+      // Check if it is a bucket region
+      if (region.isUsedForPartitionedRegionBucket()) {
+        // Check if it is a primary bucket
+        BucketRegion bRegion = (BucketRegion)region;
+        if (!(bRegion instanceof AbstractBucketRegionQueue)) {
+          if (bRegion.getBucketAdvisor().isPrimary()) {
+            
+            // Generate a tail key for each entry 
+            Set entries = txrs.getEntryKeys();
+            if (!entries.isEmpty()) {
+              Iterator entryIt = entries.iterator();
+              while (entryIt.hasNext()) {
+                Object key = entryIt.next();
+                TXEntryState txes = txrs.getTXEntryState(key);
+                
+                long tailKey = ((BucketRegion)region).generateTailKey();    
+                txes.getDistTxEntryStates().setTailKey(tailKey);
+              } 
+            } 
+          } // end if primary
+        } // end non-hdfs buckets
+      }
+    }
+  }
+
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
+   * 
+   * Take Locks Does conflict check on primary ([DISTTX] TODO on primary only)
+   * Invoke TxWriter
+   */
+  @Override
+  public void precommit() throws CommitConflictException,
+      UnsupportedOperationInTransactionException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistTXState.precommit transaction {} is closed {} ",
+          getTransactionId(), this.closed, new Throwable());
+    }
+
+    if (this.closed) {
+      return;
+    }
+
+    synchronized (this.completionGuard) {
+      this.completionStarted = true;
+    }
+
+    if (onBehalfOfRemoteStub && !proxy.isCommitOnBehalfOfRemoteStub()) {
+      throw new UnsupportedOperationInTransactionException(
+          LocalizedStrings.TXState_CANNOT_COMMIT_REMOTED_TRANSACTION
+              .toLocalizedString());
+    }
+
+    cleanupNonDirtyRegions();
+
+    /*
+     * Lock buckets so they can't be rebalanced then perform the conflict check
+     * to fix #43489
+     */
+    try {
+      lockBucketRegions();
+    } catch (PrimaryBucketException pbe) {
+      // not sure what to do here yet
+      RuntimeException re = new TransactionDataRebalancedException(
+          LocalizedStrings.PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING
+              .toLocalizedString());
+      re.initCause(pbe);
+      throw re;
+    }
+
+    if (this.locks == null) {
+      reserveAndCheck();
+    }
+
+    // For internal testing
+    if (this.internalAfterConflictCheck != null) {
+      this.internalAfterConflictCheck.run();
+    }
+    
+    updateRegionVersions();
+    
+    generateTailKeysForParallelDispatcherEvents();
+    
+    /*
+     * If there is a TransactionWriter plugged in, we need to to give it an
+     * opportunity to abort the transaction.
+     */
+    TransactionWriter writer = this.proxy.getTxMgr().getWriter();
+    if (!firedWriter && writer != null) {
+      try {
+        firedWriter = true;
+        writer.beforeCommit(getEvent());
+      } catch (TransactionWriterException twe) {
+        cleanup();
+        throw new CommitConflictException(twe);
+      } catch (VirtualMachineError err) {
+        // cleanup(); this allocates objects so I don't think we can do it -
+        // that leaves the TX open, but we are poison pilling so we should be
+        // ok??
+
+        SystemFailure.initiateFailure(err);
+        // If this ever returns, rethrow the error. We're poisoned
+        // now, so don't let this thread continue.
+        throw err;
+      } catch (Throwable t) {
+        cleanup(); // rollback the transaction!
+        // Whenever you catch Error or Throwable, you must also
+        // catch VirtualMachineError (see above). However, there is
+        // _still_ a possibility that you are dealing with a cascading
+        // error condition, so you also need to check to see if the JVM
+        // is still usable:
+        SystemFailure.checkFailure();
+        throw new CommitConflictException(t);
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit()
+   * 
+   * Apply changes release locks
+   */
+  @Override
+  public void commit() throws CommitConflictException {
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "DistTXState.commit transaction {} is closed {} ",
+          getTransactionId(), this.closed, new Throwable());
+    }
+
+    if (this.closed) {
+      return;
+    }
+
+    try {
+      List/* <TXEntryStateWithRegionAndKey> */entries = generateEventOffsets();
+      if (logger.isDebugEnabled()) {
+        logger.debug("commit entries " + entries);
+      }
+      TXCommitMessage msg = null;
+      try {
+        attachFilterProfileInformation(entries);
+
+        // apply changes to the cache
+        applyChanges(entries);
+        // For internal testing
+        if (this.internalAfterApplyChanges != null) {
+          this.internalAfterApplyChanges.run();
+        }
+
+        this.commitMessage = buildCompleteMessage();
+
+      } finally {
+        if (msg != null) {
+          msg.releaseViewVersions();
+        }
+        this.locks.releaseLocal();
+        // For internal testing
+        if (this.internalAfterReleaseLocalLocks != null) {
+          this.internalAfterReleaseLocalLocks.run();
+        }
+      }
+    } finally {
+      cleanup();
+    }
+  }
+
+  @Override
+  public void rollback() {
+    super.rollback();
+    // Cleanup is called next
+  }
+
+  /**
+   * @param txStateProxy
+   * @return
+   */
+  protected boolean applyOpsOnRedundantCopy(DistributedMember sender,
+      ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) {
+    boolean returnValue = true;
+    try {
+      boolean result = true;
+      
+      // Start TxState Update During PreCommit phase
+      setUpdatingTxStateDuringPreCommit(true); 
+      
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXState.applyOpOnRedundantCopy: size of "
+            + "secondaryTransactionalOperations = {}",
+            secondaryTransactionalOperations.size());
+      }
+      /*
+       * Handle Put Operations meant for secondary.
+       * 
+       * @see com.gemstone.gemfire.internal.cache.partitioned.PutMessage.
+       * operateOnPartitionedRegion(DistributionManager, PartitionedRegion,
+       * long)
+       * 
+       * [DISTTX] TODO need to handle other operations
+       */
+      for (DistTxEntryEvent dtop : secondaryTransactionalOperations) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXState.applyOpOnRedundantCopy: processing dist "
+              + "tx operation {}", dtop);
+        }
+        dtop.setDistributedMember(sender);
+        dtop.setOriginRemote(false);
+        /*
+         * [DISTTX} TODO handle call back argument version tag and other
+         * settings in PutMessage
+         */
+        String failureReason = null;
+        try {
+          if (dtop.getKeyInfo().isDistKeyInfo()) {
+            dtop.getKeyInfo().setCheckPrimary(false); 
+          }
+          else {
+            dtop.setKeyInfo(new DistTxKeyInfo(dtop.getKeyInfo()));
+            dtop.getKeyInfo().setCheckPrimary(false); 
+          }
+
+          //apply the op
+          result = applyIndividualOp(dtop);
+          
+          if (!result) { // make sure the region hasn't gone away
+            dtop.getRegion().checkReadiness();
+          }
+        } catch (CacheWriterException cwe) {
+          result = false;
+          failureReason = "CacheWriterException";
+        } catch (PrimaryBucketException pbe) {
+          result = false;
+          failureReason = "PrimaryBucketException";
+        } catch (InvalidDeltaException ide) {
+          result = false;
+          failureReason = "InvalidDeltaException";
+        } catch (DataLocationException e) {
+          result = false;
+          failureReason = "DataLocationException";
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("DistTXState.applyOpOnRedundantCopy {} ##op {},  "
+              + "##region {}, ##key {}", 
+              (result ? " sucessfully applied op " : " failed to apply op due to "+ failureReason), 
+              dtop.getOperation(), dtop.getRegion().getName(), dtop.getKey());
+        }
+        if (!result) {
+          returnValue = false;
+          break;
+        }
+      }
+    } finally {
+      // End TxState Update During PreCommit phase
+      setUpdatingTxStateDuringPreCommit(false);
+    }
+    return returnValue;
+  }
+
+  /**
+   * Apply the individual tx op on secondary
+   * 
+   * Calls local function such as putEntry instead of putEntryOnRemote as for
+   * this {@link DistTXStateOnCoordinator) as events will always be local. In
+   * parent {@link DistTXState} class will call remote version of functions
+   * 
+   */
+  protected boolean applyIndividualOp(DistTxEntryEvent dtop)
+      throws DataLocationException {
+    boolean result = true;
+    if (dtop.op.isUpdate() || dtop.op.isCreate()) { 
+      if (dtop.op.isPutAll()) {
+        assert(dtop.getPutAllOperation() != null);
+        //[DISTTX] TODO what do with versions next?
+        final VersionedObjectList versions = new VersionedObjectList(
+            dtop.getPutAllOperation().putAllDataSize, true,
+            dtop.region.concurrencyChecksEnabled);
+        postPutAll(dtop.getPutAllOperation(), versions, dtop.region);
+      } else {
+        result = putEntryOnRemote(dtop, false/* ifNew */,
+          dtop.hasDelta()/* ifOld */, null/* expectedOldValue */,
+          false/* requireOldValue */, 0L/* lastModified */, true/*
+                                                                 * overwriteDestroyed
+                                                                 * *not*
+                                                                 * used
+                                                                 */);
+      }
+    } else if (dtop.op.isDestroy()) {
+      if (dtop.op.isRemoveAll()) {
+        assert (dtop.getRemoveAllOperation() != null);
+        // [DISTTX] TODO what do with versions next?
+        final VersionedObjectList versions = new VersionedObjectList(
+            dtop.getRemoveAllOperation().removeAllDataSize, true,
+            dtop.region.concurrencyChecksEnabled);
+        postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.region);
+      } else {
+        destroyOnRemote(dtop, false/* TODO [DISTTX] */, null/*
+                                                             * TODO
+                                                             * [DISTTX]
+                                                             */);
+      }
+    } else if (dtop.op.isInvalidate()) {
+      invalidateOnRemote(dtop, true/* TODO [DISTTX] */, false/*
+                                                              * TODO
+                                                              * [DISTTX]
+                                                              */);
+    } else {
+      logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}",
+          dtop);
+      assert (false);
+    }
+    return result;
+  }
+  
+
+  public boolean isUpdatingTxStateDuringPreCommit() {
+    return updatingTxStateDuringPreCommit;
+  }
+
+  /**
+   * For Dist Tx
+   * 
+   * @param updatingTxState
+   *          if updating TxState during Commit Phase
+   */
+  private void setUpdatingTxStateDuringPreCommit(boolean updatingTxState)
+      throws UnsupportedOperationInTransactionException {
+    this.updatingTxStateDuringPreCommit = updatingTxState;
+    if (logger.isDebugEnabled()) {
+      logger
+          .debug(
+              "DistTXState setUpdatingTxStateDuringPreCommit incoming {} final {} ",
+              updatingTxState, this.updatingTxStateDuringPreCommit,
+              new Throwable()); // [DISTTX] TODO: Remove throwable
+    }
+  }
+
+  @Override
+  public TXRegionState writeRegion(LocalRegion r) {
+    TXRegionState result = readRegion(r);
+    if (result == null) {
+      if (r instanceof BucketRegion) {
+        result = new TXBucketRegionState((BucketRegion) r, this);
+      } else {
+        result = new TXRegionState(r, this);
+      }
+      result.setCreatedDuringCommit(this.updatingTxStateDuringPreCommit);
+      this.regions.put(r, result);
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXState writeRegion flag {} new region-state {} ",
+            this.updatingTxStateDuringPreCommit, result);
+      }
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTXState writeRegion flag {} region-state {} ",
+            this.updatingTxStateDuringPreCommit, result);
+      }
+    }
+
+    return result;
+  }
+  
+  
+  /*
+   * [DISTTX] Note: This has been overridden here to associate DistKeyInfo
+   * with event to disable primary check(see DistKeyInfo.setCheckPrimary(false)) 
+   * when this gets called on secondary of a PR 
+   * 
+   * For TX this needs to be a PR passed in as region
+   * 
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#postPutAll(com.gemstone
+   * .gemfire.internal.cache.DistributedPutAllOperation, java.util.Map,
+   * com.gemstone.gemfire.internal.cache.LocalRegion)
+   */
+  public void postPutAll(final DistributedPutAllOperation putallOp,
+      final VersionedObjectList successfulPuts, LocalRegion reg) {
+
+    final LocalRegion theRegion;
+    if (reg instanceof BucketRegion) {
+      theRegion = ((BucketRegion) reg).getPartitionedRegion();
+    } else {
+      theRegion = reg;
+    }
+    /*
+     * Don't fire events here.
+     */
+    /*
+     * We are on the data store, we don't need to do anything here. Commit will
+     * push them out.
+     */
+    /*
+     * We need to put this into the tx state.
+     */
+    theRegion.syncBulkOp(new Runnable() {
+      public void run() {
+        // final boolean requiresRegionContext =
+        // theRegion.keyRequiresRegionContext();
+        InternalDistributedMember myId = theRegion.getDistributionManager()
+            .getDistributionManagerId();
+        for (int i = 0; i < putallOp.putAllDataSize; ++i) {
+          EntryEventImpl ev = PutAllPRMessage.getEventFromEntry(theRegion,
+              myId, myId, i, putallOp.putAllData, false, putallOp
+                  .getBaseEvent().getContext(), false, !putallOp.getBaseEvent()
+                  .isGenerateCallbacks(), false);
+          try {
+//            ev.setPutAllOperation(putallOp);
+            
+            // below if condition returns true on secondary when TXState is
+            // updated in preCommit only on secondary
+            // In this case disable the primary check by calling
+            // distKeyInfo.setCheckPrimary(false);
+            if (isUpdatingTxStateDuringPreCommit()) {
+              KeyInfo keyInfo = ev.getKeyInfo();
+              DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo);
+              distKeyInfo.setCheckPrimary(false);
+              ev.setKeyInfo(distKeyInfo);
+            }
+            if (theRegion.basicPut(ev, false, false, null, false)) {
+              successfulPuts.addKeyAndVersion(putallOp.putAllData[i].key, null);
+            }
+          } finally {
+            ev.release();
+          }
+        }
+      }
+    }, putallOp.getBaseEvent().getEventId());
+
+  }
+  
+  @Override
+  public void postRemoveAll(final DistributedRemoveAllOperation op,
+      final VersionedObjectList successfulOps, LocalRegion reg) {
+    final LocalRegion theRegion;
+    if (reg instanceof BucketRegion) {
+      theRegion = ((BucketRegion) reg).getPartitionedRegion();
+    } else {
+      theRegion = reg;
+    }
+    /*
+     * Don't fire events here. We are on the data store, we don't need to do
+     * anything here. Commit will push them out. We need to put this into the tx
+     * state.
+     */
+    theRegion.syncBulkOp(new Runnable() {
+      public void run() {
+        InternalDistributedMember myId = theRegion.getDistributionManager()
+            .getDistributionManagerId();
+        for (int i = 0; i < op.removeAllDataSize; ++i) {
+          EntryEventImpl ev = RemoveAllPRMessage.getEventFromEntry(theRegion,
+              myId, myId, i, op.removeAllData, false, op.getBaseEvent()
+                  .getContext(), false, !op.getBaseEvent()
+                  .isGenerateCallbacks());
+          ev.setRemoveAllOperation(op);
+          // below if condition returns true on secondary when TXState is
+          // updated in preCommit only on secondary
+          // In this case disable the primary check by calling
+          // distKeyInfo.setCheckPrimary(false);
+          if (isUpdatingTxStateDuringPreCommit()) {
+            KeyInfo keyInfo = ev.getKeyInfo();
+            DistTxKeyInfo distKeyInfo = new DistTxKeyInfo(keyInfo);
+            distKeyInfo.setCheckPrimary(false);
+            ev.setKeyInfo(distKeyInfo);
+          }
+          try {
+            theRegion.basicDestroy(ev, true/* should we invoke cacheWriter? */,
+                null);
+          } catch (EntryNotFoundException ignore) {
+          }
+          successfulOps.addKeyAndVersion(op.removeAllData[i].key, null);
+        }
+      }
+    }, op.getBaseEvent().getEventId());
+
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return true;
+  }
+  
+  /*
+   * Populate list of entry states for each region while replying precommit
+   */
+  public boolean populateDistTxEntryStateList(
+      TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap) {
+    for (Map.Entry<LocalRegion, TXRegionState> me : this.regions.entrySet()) {
+      LocalRegion r = me.getKey();
+      TXRegionState txrs = me.getValue();
+      String regionFullPath = r.getFullPath();
+      if (!txrs.isCreatedDuringCommit()) {
+        ArrayList<DistTxThinEntryState> entryStateList = new ArrayList<DistTxThinEntryState>();
+        boolean returnValue = txrs.populateDistTxEntryStateList(entryStateList);
+        if (returnValue) {
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug("DistTxState.populateDistTxEntryStateList Adding entries "
+                    + " with count="
+                    + entryStateList.size()
+                    + " for region "
+                    + regionFullPath + " . Added list=" + entryStateList);
+          }
+          entryStateSortedMap.put(regionFullPath, entryStateList);
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug("DistTxState.populateDistTxEntryStateList Got exception for region "
+                    + regionFullPath);
+          }
+          return false;
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger
+              .debug("DistTxState.populateDistTxEntryStateList Not adding entries for region "
+                  + regionFullPath);
+        }
+      }
+    }
+    return true;
+  }
+  
+  /*
+   * Set list of entry states for each region while applying commit
+   */
+  public void setDistTxEntryStates(
+      ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+    TreeMap<String, TXRegionState> regionSortedMap = new TreeMap<>();
+    for (TXRegionState txrs : this.regions.values()) {
+      if (txrs.isCreatedDuringCommit()) {
+        regionSortedMap.put(txrs.getRegion().getFullPath(), txrs);
+      }
+    }
+
+    int index = 0;
+    for (Entry<String, TXRegionState> me : regionSortedMap.entrySet()) {
+      String regionFullPath = me.getKey();
+      TXRegionState txrs = me.getValue();
+      ArrayList<DistTxThinEntryState> entryEvents = entryEventList.get(index++);
+      if (logger.isDebugEnabled()) {
+        logger.debug("DistTxState.setDistTxEntryStates For region="
+            + regionFullPath + " ,index=" + index + " ,entryEvents=("
+            + entryEvents.size() + ")=" + entryEvents + " ,regionSortedMap="
+            + regionSortedMap.keySet());
+      }
+      txrs.setDistTxEntryStates(entryEvents);
+    }
+  }
+}


Mime
View raw message