Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B70918D71 for ; Fri, 3 Jul 2015 19:21:19 +0000 (UTC) Received: (qmail 20846 invoked by uid 500); 3 Jul 2015 19:21:19 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 20822 invoked by uid 500); 3 Jul 2015 19:21:19 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 20813 invoked by uid 99); 3 Jul 2015 19:21:19 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 74365C0617 for ; Fri, 3 Jul 2015 19:21:18 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 956hZNSouNnY for ; Fri, 3 Jul 2015 19:21:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 5FEDA20DB7 for ; Fri, 3 Jul 2015 19:21:04 +0000 (UTC) Received: (qmail 18327 invoked by uid 99); 3 Jul 2015 19:21:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31B80E3687; Fri, 3 Jul 2015 19:21:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Fri, 03 Jul 2015 19:21:10 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/51] [partial] incubator-geode git commit: SGA #2 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java new file mode 100644 index 0000000..7939b77 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateOnCoordinator.java @@ -0,0 +1,400 @@ +package com.gemstone.gemfire.internal.cache; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.TreeSet; + +import com.gemstone.gemfire.cache.EntryNotFoundException; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; +import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; +import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +/** + * TxState on TX coordinator, created when coordinator is also a data node + * + * @author shirishd + * + */ +public final class DistTXStateOnCoordinator extends DistTXState implements + DistTXCoordinatorInterface { + + private ArrayList primaryTransactionalOperations = null; + private ArrayList secondaryTransactionalOperations = null; + + private boolean preCommitResponse = false; + private boolean rollbackResponse = false; + + public DistTXStateOnCoordinator(TXStateProxy proxy, + boolean onBehalfOfRemoteStub) { + super(proxy, onBehalfOfRemoteStub); + primaryTransactionalOperations = new ArrayList(); + secondaryTransactionalOperations = new ArrayList(); + } + + public final ArrayList getPrimaryTransactionalOperations() + throws UnsupportedOperationInTransactionException { + return primaryTransactionalOperations; + } + + private final void addPrimaryTransactionalOperations(DistTxEntryEvent dtop) { + if (logger.isDebugEnabled()) { + // [DISTTX] TODO Remove these + logger + .debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations add " + + dtop + + " ,stub before=" + + this + + " ,isUpdatingTxStateDuringPreCommit=" + + isUpdatingTxStateDuringPreCommit()); + } + if (!isUpdatingTxStateDuringPreCommit()) { + primaryTransactionalOperations.add(dtop); + // [DISTTX] TODO Remove this + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations " + + " add primary op = {}", dtop); + + } + } + if (logger.isDebugEnabled()) { + // [DISTTX] TODO Remove these + logger + .debug("DistTXStateOnCoordinator.addPrimaryTransactionalOperations stub after add = " + + this); + } + } + + public final void addSecondaryTransactionalOperations(DistTxEntryEvent dtop) + throws UnsupportedOperationInTransactionException { + secondaryTransactionalOperations.add(dtop); + } + + @Override + public void precommit() { + boolean retVal = applyOpsOnRedundantCopy(this.proxy.getCache() + .getDistributedSystem().getDistributedMember(), + this.secondaryTransactionalOperations); + if (retVal) { + super.precommit(); + } + this.preCommitResponse = retVal; // Apply if no exception + } + + @Override + public void rollback() { + super.rollback(); + this.rollbackResponse = true; // True if no exception + // Cleanup is called next + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.TXStateStub#putEntry(com.gemstone.gemfire + * .internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object, + * boolean, long, boolean) + */ + @Override + public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld, + Object expectedOldValue, boolean requireOldValue, long lastModified, + boolean overwriteDestroyed) { + if (logger.isDebugEnabled()) { + // [DISTTX] TODO Remove throwable + logger.debug("DistTXStateOnCoordinator.putEntry " + + event.getKeyInfo().getKey(), new Throwable()); + } + + boolean returnValue = super.putEntry(event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified, overwriteDestroyed); + + // putAll event is already added in postPutAll, don't add individual events + // from the putAll operation again + if (!event.getOperation().isPutAll()) { + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + return returnValue; + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com + * .gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean, + * java.lang.Object, boolean, long, boolean) + */ + @Override + public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, + boolean ifOld, Object expectedOldValue, boolean requireOldValue, + long lastModified, boolean overwriteDestroyed) + throws DataLocationException { + if (logger.isDebugEnabled()) { + // [DISTTX] TODO Remove throwable + logger.debug("DistTXStateOnCoordinator.putEntryOnRemote " + + event.getKeyInfo().getKey(), new Throwable()); + } + + boolean returnValue = super.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue, + requireOldValue, lastModified, overwriteDestroyed); + + // putAll event is already added in postPutAll, don't add individual events + // from the putAll operation again + if (!event.getOperation().isPutAll()) { + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + return returnValue; + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry + * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, + * java.lang.Object) + */ + public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite, + Object expectedOldValue) throws EntryNotFoundException { +// logger.debug("DistTXStateOnCoordinator.destroyExistingEntry", new Throwable()); + + super.destroyExistingEntry(event, cacheWrite, expectedOldValue); + + // removeAll event is already added in postRemoveAll, don't add individual + // events from the removeAll operation again + if (!event.getOperation().isRemoveAll()) { + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java + * .lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl, + * java.lang.Object) + */ + public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite, + Object expectedOldValue) throws DataLocationException { +// logger.debug("DistTXStateOnCoordinator.destroyOnRemote", new Throwable()); + + super.destroyOnRemote(event, cacheWrite, expectedOldValue); + + // removeAll event is already added in postRemoveAll, don't add individual + // events from the removeAll operation again + if (!event.getOperation().isRemoveAll()) { + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry + * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean) + */ + public void invalidateExistingEntry(EntryEventImpl event, + boolean invokeCallbacks, boolean forceNewEntry) { +// logger +// .debug("DistTXStateOnCoordinator.invalidateExistingEntry", new Throwable()); + + super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry); + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + + /* + * (non-Javadoc) + * + * @see + * com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote + * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean) + */ + public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks, + boolean forceNewEntry) throws DataLocationException { +// logger.debug("DistTXStateOnCoordinator.invalidateOnRemote", new Throwable()); + super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry); + addPrimaryTransactionalOperations(new DistTxEntryEvent(event)); + } + + + public void postPutAll(DistributedPutAllOperation putallOp, + VersionedObjectList successfulPuts, LocalRegion region) { + super.postPutAll(putallOp, successfulPuts, region); + EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region, + Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp + .getBaseEvent().getValue()); + event.setEventId(putallOp.getBaseEvent().getEventId()); + DistTxEntryEvent dtop = new DistTxEntryEvent(event); + dtop.setPutAllOperation(putallOp); + addPrimaryTransactionalOperations(dtop); + } + + public void postRemoveAll(DistributedRemoveAllOperation removeAllOp, + VersionedObjectList successfulOps, LocalRegion region) { + super.postRemoveAll(removeAllOp, successfulOps, region); + EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp, + region, removeAllOp.getBaseEvent().getKey()); + event.setEventId(removeAllOp.getBaseEvent().getEventId()); + DistTxEntryEvent dtop = new DistTxEntryEvent(event); + dtop.setRemoveAllOperation(removeAllOp); + addPrimaryTransactionalOperations(dtop); + } + + @Override + public boolean getPreCommitResponse() + throws UnsupportedOperationInTransactionException { + return this.preCommitResponse; + } + + @Override + public boolean getRollbackResponse() + throws UnsupportedOperationInTransactionException { + return this.rollbackResponse; + } + + @Override + public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm) + throws UnsupportedOperationInTransactionException { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION + .toLocalizedString("setPrecommitMessage")); + } + + @Override + public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm) + throws UnsupportedOperationInTransactionException { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION + .toLocalizedString("setCommitMessage")); + } + + @Override + public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm) + throws UnsupportedOperationInTransactionException { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.Dist_TX_ROLLBACK_NOT_SUPPORTED_IN_A_TRANSACTION + .toLocalizedString("setRollbackMessage")); + } + + @Override + public void gatherAffectedRegions(HashSet regionSet, + boolean includePrimaryRegions, boolean includeRedundantRegions) + throws UnsupportedOperationInTransactionException { + if (includePrimaryRegions) { + for (DistTxEntryEvent dtos : this.primaryTransactionalOperations) { + regionSet.add(dtos.getRegion()); + } + } + if (includeRedundantRegions) { + for (DistTxEntryEvent dtos : this.secondaryTransactionalOperations) { + regionSet.add(dtos.getRegion()); + } + } + } + + @Override + public void gatherAffectedRegionsName(TreeSet sortedRegionName, + boolean includePrimaryRegions, boolean includeRedundantRegions) + throws UnsupportedOperationInTransactionException { + if (includePrimaryRegions) { + gatherAffectedRegions(sortedRegionName, + this.primaryTransactionalOperations); + } + if (includeRedundantRegions) { + gatherAffectedRegions(sortedRegionName, + this.secondaryTransactionalOperations); + } + } + + public static void gatherAffectedRegions(TreeSet sortedRegionName, + ArrayList regionOps) { + for (DistTxEntryEvent dtos : regionOps) { + LocalRegion lr = dtos.getRegion(); + if (lr instanceof PartitionedRegion) { + sortedRegionName.add(PartitionedRegionHelper.getBucketFullPath( + lr.getFullPath(), dtos.getKeyInfo().getBucketId())); + } else { + sortedRegionName.add(lr.getFullPath()); + } + } + } + + /** + * {@inheritDoc} + * + */ + @Override + protected boolean applyIndividualOp(DistTxEntryEvent dtop) + throws DataLocationException { + boolean result = true; + if (dtop.op.isUpdate() || dtop.op.isCreate()) { + if (dtop.op.isPutAll()) { + assert(dtop.getPutAllOperation() != null); + //[DISTTX] TODO what do with versions next? + final VersionedObjectList versions = new VersionedObjectList( + dtop.getPutAllOperation().putAllDataSize, true, + dtop.region.concurrencyChecksEnabled); + postPutAll(dtop.getPutAllOperation(), versions, dtop.region); + } else { + result = putEntry(dtop, false/* ifNew */, + dtop.hasDelta()/* ifOld */, null/* expectedOldValue */, + false/* requireOldValue */, 0L/* lastModified */, true/* + * overwriteDestroyed + * *not* + * used + */); + } + } else if (dtop.op.isDestroy()) { + if (dtop.op.isRemoveAll()) { + assert (dtop.getRemoveAllOperation() != null); + // [DISTTX] TODO what do with versions next? + final VersionedObjectList versions = new VersionedObjectList( + dtop.getRemoveAllOperation().removeAllDataSize, true, + dtop.region.concurrencyChecksEnabled); + postRemoveAll(dtop.getRemoveAllOperation(), versions, dtop.region); + } else { + destroyExistingEntry(dtop, false/* TODO [DISTTX] */, null/* + * TODO + * [DISTTX] + */); + } + } else if (dtop.op.isInvalidate()) { + invalidateExistingEntry(dtop, true/* TODO [DISTTX] */, false/* + * TODO + * [DISTTX] + */); + } else { + logger.debug("DistTXCommitPhaseOneMessage: unsupported TX operation {}", + dtop); + assert (false); + } + return result; + } + + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(super.toString()); + builder.append(" ,primary txOps=").append(this.primaryTransactionalOperations); + builder.append(" ,secondary txOps=").append(this.secondaryTransactionalOperations); + builder.append(" ,preCommitResponse=").append(this.preCommitResponse); + builder.append(" ,rollbackResponse=").append(this.rollbackResponse); + return builder.toString(); + } + + @Override + public boolean isCreatedOnDistTxCoordinator() { + return true; + } + + @Override + public void finalCleanup() { + cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java new file mode 100644 index 0000000..8d9ad09 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImpl.java @@ -0,0 +1,32 @@ +package com.gemstone.gemfire.internal.cache; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.logging.LogService; + +/** + * + * @author shirishd + * + */ +public abstract class DistTXStateProxyImpl extends TXStateProxyImpl { + + protected static final Logger logger = LogService.getLogger(); + + public DistTXStateProxyImpl(TXManagerImpl managerImpl, TXId id, + InternalDistributedMember clientMember) { + super(managerImpl, id, clientMember); + // TODO Auto-generated constructor stub + } + + public DistTXStateProxyImpl(TXManagerImpl managerImpl, TXId id, boolean isjta) { + super(managerImpl, id, isjta); + // TODO Auto-generated constructor stub + } + + @Override + public boolean isDistTx() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java new file mode 100644 index 0000000..15291b0 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnCoordinator.java @@ -0,0 +1,1034 @@ +package com.gemstone.gemfire.internal.cache; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import com.gemstone.gemfire.cache.CommitConflictException; +import com.gemstone.gemfire.cache.Operation; +import com.gemstone.gemfire.cache.TransactionInDoubtException; +import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.DistTXPrecommitMessage.DistTxPrecommitResponse; +import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData; +import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData; +import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState; +import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList; +import com.gemstone.gemfire.internal.cache.tx.DistClientTXStateStub; +import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl { + + /** + * A map of distributed system member to either {@link DistPeerTXStateStub} or + * {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data + * node) + */ + protected HashMap target2realDeals = new HashMap<>(); + private HashMap rrTargets; + private Set txRemoteParticpants = null; // other than local + protected HashMap> txEntryEventMap = null; + + public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id, + InternalDistributedMember clientMember) { + super(managerImpl, id, clientMember); + } + + public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id, + boolean isjta) { + super(managerImpl, id, isjta); + } + + /* + * (non-Javadoc) + * + * @see com.gemstone.gemfire.internal.cache.TXStateInterface#commit() + * + * [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure + * these messages reach all + */ + @Override + public void commit() throws CommitConflictException { + boolean preserveTx = false; + boolean precommitResult = false; + try { + // create a map of secondary(for PR) / replica(for RR) to stubs to send + // commit message to those + HashMap otherTargets2realDeals = getSecondariesAndReplicasForTxOps(); + // add it to the existing map and then send commit to all copies + target2realDeals.putAll(otherTargets2realDeals); + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + + target2realDeals); + } + + precommitResult = doPrecommit(); + if (precommitResult) { + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.commit Going for commit "); + } + boolean phase2commitDone = doCommit(); + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit " + + (phase2commitDone ? "Done" : "Failed")); + } + // [DISTTX] TODO Handle this exception well + if (!phase2commitDone) { + throw new TransactionInDoubtException( + LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER + .toLocalizedString()); + } + } else { + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.commit precommitResult = " + + precommitResult); + } + } + } catch (CommitConflictException e) { + throw e; + } catch (UnsupportedOperationInTransactionException e) { + // fix for #42490 + preserveTx = true; + throw e; + } finally { + // [DISTTX] TODO What about rollback exceptions? + if (!precommitResult) { + rollback(); + } + + inProgress = preserveTx; + if (this.synchRunnable != null) { + this.synchRunnable.abort(); + } + } + } + + /** + * creates a map of all secondaries(for PR) / replicas(for RR) to stubs to + * send commit message to those + */ + private HashMap getSecondariesAndReplicasForTxOps() { + final GemFireCacheImpl cache = GemFireCacheImpl + .getExisting("getSecondariesAndReplicasForTxOps"); + InternalDistributedMember currentNode = cache.getDistributedSystem() + .getDistributedMember(); + + HashMap secondaryTarget2realDeals = new HashMap<>(); + for (Entry e : target2realDeals + .entrySet()) { + DistributedMember originalTarget = e.getKey(); + DistTXCoordinatorInterface distPeerTxStateStub = e.getValue(); + + ArrayList primaryTxOps = distPeerTxStateStub + .getPrimaryTransactionalOperations(); + for (DistTxEntryEvent dtop : primaryTxOps) { + LocalRegion lr = dtop.getRegion(); + // replicas or secondaries + Set otherNodes = null; + if (lr instanceof PartitionedRegion) { + Set allNodes = ((PartitionedRegion) dtop + .getRegion()).getRegionAdvisor().getBucketOwners( + dtop.getKeyInfo().getBucketId()); + allNodes.remove(originalTarget); + otherNodes = allNodes; + } else if (lr instanceof DistributedRegion) { + otherNodes = ((DistributedRegion) lr).getCacheDistributionAdvisor() + .adviseInitializedReplicates(); + otherNodes.remove(originalTarget); + } + + if (otherNodes != null) { + for (InternalDistributedMember dm : otherNodes) { + // whether the target already exists due to other Tx op on the node + DistTXCoordinatorInterface existingDistPeerTXStateStub = target2realDeals + .get(dm); + if (existingDistPeerTXStateStub == null) { + existingDistPeerTXStateStub = secondaryTarget2realDeals.get(dm); + if (existingDistPeerTXStateStub == null) { + DistTXCoordinatorInterface newTxStub = null; + if (currentNode.equals(dm)) { + // [DISTTX] TODO add a test case for this condition? + newTxStub = new DistTXStateOnCoordinator(this, false); + } else { + newTxStub = new DistPeerTXStateStub(this, dm, + onBehalfOfClientMember); + } + newTxStub.addSecondaryTransactionalOperations(dtop); + secondaryTarget2realDeals.put(dm, newTxStub); + } else { + existingDistPeerTXStateStub + .addSecondaryTransactionalOperations(dtop); + } + } else { + existingDistPeerTXStateStub + .addSecondaryTransactionalOperations(dtop); + } + } + } + } + } + return secondaryTarget2realDeals; + } + + @Override + public void rollback() { + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.rollback Going for rollback "); + } + + boolean finalResult = false; + final GemFireCacheImpl cache = GemFireCacheImpl + .getExisting("Applying Dist TX Rollback"); + final DM dm = cache.getDistributionManager(); + try { + // Create Tx Participants + Set txRemoteParticpants = getTxRemoteParticpants(dm); + + // create processor and rollback message + DistTXRollbackMessage.DistTxRollbackReplyProcessor processor = new DistTXRollbackMessage.DistTxRollbackReplyProcessor( + this.getTxId(), dm, txRemoteParticpants, target2realDeals); + // TODO [DISTTX} whats ack threshold? + processor.enableSevereAlertProcessing(); + final DistTXRollbackMessage rollbackMsg = new DistTXRollbackMessage( + this.getTxId(), this.onBehalfOfClientMember, processor); + + // send rollback message to remote nodes + for (DistributedMember remoteNode : txRemoteParticpants) { + DistTXCoordinatorInterface remoteTXStateStub = target2realDeals + .get(remoteNode); + if (remoteTXStateStub.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistPeerTXStateStub", remoteTXStateStub.getClass() + .getSimpleName())); + } + try { + remoteTXStateStub.setRollbackMessage(rollbackMsg, dm); + remoteTXStateStub.rollback(); + } finally { + remoteTXStateStub.setRollbackMessage(null, null); + remoteTXStateStub.finalCleanup(); + } + if (logger.isDebugEnabled()) { // TODO - make this trace level + logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + + remoteNode); + } + } + + // Do rollback on local node + DistTXCoordinatorInterface localTXState = target2realDeals + .get(dm.getId()); + if (localTXState != null) { + if (!localTXState.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistTXStateOnCoordinator", localTXState.getClass() + .getSimpleName())); + } + localTXState.rollback(); + boolean localResult = localTXState.getRollbackResponse(); + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " + + dm.getId() + " ,result= " + localResult + " ,finalResult-old= " + + finalResult); + } + finalResult = finalResult && localResult; + } + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualSend != null) { + // internalAfterIndividualSend.run(); + // } + + /* + * [DISTTX] TODO see how to handle exception + */ + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualCommitProcess != null) { + // // Testing callback + // internalAfterIndividualCommitProcess.run(); + // } + + { // Wait for results + dm.getCancelCriterion().checkCancelInProgress(null); + processor.waitForPrecommitCompletion(); + + // [DISTTX} TODO Handle stats + // dm.getStats().incCommitWaits(); + + Map remoteResults = processor + .getRollbackResponseMap(); + for (Entry e : remoteResults.entrySet()) { + DistributedMember target = e.getKey(); + Boolean remoteResult = e.getValue(); + if (logger.isDebugEnabled()) { // TODO - make this trace level + logger + .debug("DistTXStateProxyImplOnCoordinator.rollback target = " + + target + " ,result= " + remoteResult + + " ,finalResult-old= " + finalResult); + } + finalResult = finalResult && remoteResult; + } + } + + } finally { + inProgress = false; + if (this.synchRunnable != null) { + this.synchRunnable.abort(); + } + } + + /* + * [DISTTX] TODO Write similar method to take out exception + * + * [DISTTX] TODO Handle Reliable regions + */ + // if (this.hasReliableRegions) { + // checkDistributionReliability(distMap, processor); + // } + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.rollback finalResult= " + + finalResult); + } + } + + /** + * {@inheritDoc} + */ + @Override + public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) { + if (r != null) { + target = null; + // wait for the region to be initialized fixes bug 44652 + r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage); + if (r instanceof PartitionedRegion) { + target = getOwnerForKey(r, key); + } else if (r instanceof BucketRegion) { + target = ((BucketRegion) r).getBucketAdvisor().getPrimary(); +// target = r.getMyId(); + } else { //replicated region + target = getRRTarget(key, r); + } + this.realDeal = target2realDeals.get(target); + } + if (this.realDeal == null) { +// assert (r != null); + if (r == null) { // TODO: stop gap to get tests working + this.realDeal = new DistTXStateOnCoordinator(this, false); + target = this.txMgr.getDM().getId(); + } else { + // Code to keep going forward + if (r.hasServerProxy()) { + // TODO [DISTTX] See what we need for client? + this.realDeal = new DistClientTXStateStub(this, target, r); + if (r.scope.isDistributed()) { + if (txDistributedClientWarningIssued.compareAndSet(false, true)) { + logger + .warn(LocalizedMessage + .create( + LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX, + r.getFullPath())); + } + } + } else { + // (r != null) code block above + if (target == null || target.equals(this.txMgr.getDM().getId())) { + this.realDeal = new DistTXStateOnCoordinator(this, false); + } else { + this.realDeal = new DistPeerTXStateStub(this, target, + onBehalfOfClientMember); + } + } + } + if (logger.isDebugEnabled()) { + logger + .debug( + "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}", + this.realDeal, this.txMgr.getDM().getId(), this, target, + new Throwable()); + } + target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal); + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = " + + target2realDeals); + } + } else { + if (logger.isDebugEnabled()) { + logger + .debug( + "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}", + this.realDeal, this, target, target2realDeals); + } + } + return this.realDeal; + } + + @Override + public TXStateInterface getRealDeal(DistributedMember t) { + assert t != null; + this.realDeal = target2realDeals.get(target); + if (this.realDeal == null) { + this.target = t; + this.realDeal = new DistPeerTXStateStub(this, target, + onBehalfOfClientMember); + if (logger.isDebugEnabled()) { + logger + .debug( + "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}", + this.realDeal, this.txMgr.getDM().getId()); + } + if (!this.realDeal.isDistTx() + || this.realDeal.isCreatedOnDistTxCoordinator() + || !this.realDeal.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED + .toLocalizedString("DistPeerTXStateStub", this.realDeal + .getClass().getSimpleName())); + } + target2realDeals.put(target, (DistPeerTXStateStub) realDeal); + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = " + + target2realDeals); + } + } else { + if (logger.isDebugEnabled()) { + logger + .debug( + "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}", + this.realDeal, this, target, target2realDeals); + } + } + return this.realDeal; + } + + /* + * [DISTTX] TODO Do some optimization + */ + private DistributedMember getRRTarget(KeyInfo key, LocalRegion r) { + if (this.rrTargets == null) { + this.rrTargets = new HashMap(); + } + DistributedMember m = this.rrTargets.get(r); + if (m == null) { + m = getOwnerForKey(r, key); + this.rrTargets.put(r, m); + } + return m; + } + + private Set getTxRemoteParticpants(final DM dm) { + if (this.txRemoteParticpants == null) { + Set txParticpants = target2realDeals.keySet(); + this.txRemoteParticpants = new HashSet(txParticpants); + // Remove local member from remote participant list + this.txRemoteParticpants.remove(dm.getId()); + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = " + + txParticpants + + " ,txRemoteParticpants=" + + this.txRemoteParticpants + " ,originator=" + dm.getId()); + } + } + return txRemoteParticpants; + } + + private boolean doPrecommit() { + boolean finalResult = true; + final GemFireCacheImpl cache = GemFireCacheImpl + .getExisting("Applying Dist TX Precommit"); + final DM dm = cache.getDistributionManager(); + + // Create Tx Participants + Set txParticpants = target2realDeals.keySet(); + Set txRemoteParticpants = getTxRemoteParticpants(dm); + + // Determine if the set of VMs for any of the Regions for this TX have + // changed + DistributedRegion dr = null; + HashSet affectedRegions = new HashSet(); + for (DistTXCoordinatorInterface distTXStateStub : target2realDeals.values()) { + affectedRegions.clear(); + distTXStateStub.gatherAffectedRegions(affectedRegions, true, false); + for (LocalRegion lr : affectedRegions) { + if (lr.getScope().isLocal()) { + continue; + } + // [DISTTX] TODO what about PR? + if (lr instanceof DistributedRegion) { + dr = (DistributedRegion) lr; + CacheDistributionAdvisor adv = dr.getCacheDistributionAdvisor(); + Set newRegionMemberView = adv.adviseTX(); + + if (!txParticpants.containsAll(newRegionMemberView)) { + logger + .warn(LocalizedMessage + .create( + LocalizedStrings.TXCommitMessage_NEW_MEMBERS_FOR_REGION_0_ORIG_LIST_1_NEW_LIST_2, + new Object[] { dr, txParticpants, newRegionMemberView })); + } + } + } + } + + // create processor and precommit message + DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor = new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor( + this.getTxId(), dm, txRemoteParticpants, target2realDeals); + // TODO [DISTTX} whats ack threshold? + processor.enableSevereAlertProcessing(); + final DistTXPrecommitMessage precommitMsg = new DistTXPrecommitMessage( + this.getTxId(), this.onBehalfOfClientMember, processor); + + // send precommit message to remote nodes + for (DistributedMember remoteNode : txRemoteParticpants) { + DistTXCoordinatorInterface remoteTXStateStub = target2realDeals + .get(remoteNode); + if (remoteTXStateStub.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistPeerTXStateStub", remoteTXStateStub.getClass() + .getSimpleName())); + } + try { + remoteTXStateStub.setPrecommitMessage(precommitMsg, dm); + remoteTXStateStub.precommit(); + } finally { + remoteTXStateStub.setPrecommitMessage(null, null); + } + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + + remoteNode); + } + } + + // Do precommit on local node + TreeSet sortedRegionName = new TreeSet<>(); + DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId()); + if (localTXState != null) { + if (!localTXState.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistTXStateOnCoordinator", localTXState.getClass() + .getSimpleName())); + } + localTXState.precommit(); + boolean localResult = localTXState.getPreCommitResponse(); + TreeMap> entryStateSortedMap = new TreeMap>(); + ArrayList> entryEventList = null; + if (localResult) { + localResult = ((DistTXStateOnCoordinator) localTXState) + .populateDistTxEntryStateList(entryStateSortedMap); + if (localResult) { + entryEventList = new ArrayList>( + entryStateSortedMap.values()); + populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName); + } + } + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " + + dm.getId() + " ,entryEventList=" + + printEntryEventList(entryEventList) + " ,txRegionVersionsMap=" + + printEntryEventMap(this.txEntryEventMap) + " ,result= " + + localResult + " ,finalResult-old= " + finalResult); + } + finalResult = finalResult && localResult; + } + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualSend != null) { + // internalAfterIndividualSend.run(); + // } + + /* + * [DISTTX] TODO see how to handle exception + */ + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualCommitProcess != null) { + // // Testing callback + // internalAfterIndividualCommitProcess.run(); + // } + + { // Wait for results + dm.getCancelCriterion().checkCancelInProgress(null); + processor.waitForPrecommitCompletion(); + + // [DISTTX} TODO Handle stats + // dm.getStats().incCommitWaits(); + + Map remoteResults = processor + .getCommitResponseMap(); + for (Entry e : remoteResults + .entrySet()) { + DistributedMember target = e.getKey(); + DistTxPrecommitResponse remoteResponse = e.getValue(); + ArrayList> entryEventList = remoteResponse.getDistTxEntryEventList(); + populateEntryEventMap(target, entryEventList, sortedRegionName); + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = " + + target + + " ,sortedRegions" + + sortedRegionName + + " ,entryEventList=" + + printEntryEventList(entryEventList) + + " ,txEntryEventMap=" + + printEntryEventMap(this.txEntryEventMap) + + " ,result= " + + remoteResponse.getCommitState() + + " ,finalResult-old= " + + finalResult); + } + finalResult = finalResult && remoteResponse.getCommitState(); + } + } + + /* + * [DISTTX] TODO Write similar method to take out exception + * + * [DISTTX] TODO Handle Reliable regions + */ + // if (this.hasReliableRegions) { + // checkDistributionReliability(distMap, processor); + // } + + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.doPrecommit finalResult= " + + finalResult); + } + return finalResult; + } + + /* + * Handle response of precommit reply + * + * Go over list of region versions for this target and fill map + */ + private void populateEntryEventMap(DistributedMember target, + ArrayList> entryEventList, + TreeSet sortedRegionName) { + if (this.txEntryEventMap == null) { + this.txEntryEventMap = new HashMap>(); + } + + DistTXCoordinatorInterface distTxIface = target2realDeals.get(target); + if (distTxIface.getPrimaryTransactionalOperations() != null + && distTxIface.getPrimaryTransactionalOperations().size() > 0) { + sortedRegionName.clear(); + distTxIface.gatherAffectedRegionsName(sortedRegionName, true, false); + + if (sortedRegionName.size() != entryEventList.size()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("size of " + + sortedRegionName.size() + " {" + sortedRegionName + "}" + + " for target=" + target, entryEventList.size() + " {" + + entryEventList + "}")); + } + + int index = 0; + // Get region as per sorted order of region path + for (String rName : sortedRegionName) { + txEntryEventMap.put(rName, entryEventList.get(index++)); + } + } + } + + /* + * Populate list of regions for this target, while sending commit messages + */ + private void populateEntryEventList(DistributedMember target, + ArrayList> entryEventList, + TreeSet sortedRegionMap) { + DistTXCoordinatorInterface distTxItem = target2realDeals.get(target); + sortedRegionMap.clear(); + distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true); + + // Get region as per sorted order of region path + entryEventList.clear(); + for (String rName : sortedRegionMap) { + ArrayList entryStates = this.txEntryEventMap + .get(rName); + if (entryStates == null) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "entryStates for " + rName + " at target " + target, "null")); + } + entryEventList.add(entryStates); + } + } + + /* + * [DISTTX] TODO - Handle result TXMessage + */ + private boolean doCommit() { + boolean finalResult = true; + final GemFireCacheImpl cache = GemFireCacheImpl + .getExisting("Applying Dist TX Commit"); + final DM dm = cache.getDistributionManager(); + + // Create Tx Participants + Set txRemoteParticpants = getTxRemoteParticpants(dm); + + // create processor and commit message + DistTXCommitMessage.DistTxCommitReplyProcessor processor = new DistTXCommitMessage.DistTxCommitReplyProcessor( + this.getTxId(), dm, txRemoteParticpants, target2realDeals); + // TODO [DISTTX} whats ack threshold? + processor.enableSevereAlertProcessing(); + final DistTXCommitMessage commitMsg = new DistTXCommitMessage( + this.getTxId(), this.onBehalfOfClientMember, processor); + + // send commit message to remote nodes + ArrayList> entryEventList = new ArrayList<>(); + TreeSet sortedRegionName = new TreeSet<>(); + for (DistributedMember remoteNode : txRemoteParticpants) { + DistTXCoordinatorInterface remoteTXStateStub = target2realDeals + .get(remoteNode); + if (remoteTXStateStub.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistPeerTXStateStub", remoteTXStateStub.getClass() + .getSimpleName())); + } + try { + populateEntryEventList(remoteNode, entryEventList, + sortedRegionName); + commitMsg.setEntryStateList(entryEventList); + remoteTXStateStub.setCommitMessage(commitMsg, dm); + remoteTXStateStub.commit(); + } finally { + remoteTXStateStub.setCommitMessage(null, null); + remoteTXStateStub.finalCleanup(); + } + if (logger.isDebugEnabled()) { + logger + .debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = " + + remoteNode + + " ,sortedRegions=" + + sortedRegionName + + " ,entryEventList=" + + printEntryEventList(entryEventList) + + " ,txEntryEventMap=" + + printEntryEventMap(this.txEntryEventMap)); + } + } + + // Do commit on local node + DistTXCoordinatorInterface localTXState = target2realDeals.get(dm.getId()); + if (localTXState != null) { + if (!localTXState.isTxState()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistTXStateOnCoordinator", localTXState.getClass() + .getSimpleName())); + } + populateEntryEventList(dm.getId(), entryEventList, sortedRegionName); + ((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList); + localTXState.commit(); + TXCommitMessage localResultMsg = localTXState.getCommitMessage(); + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.doCommit local = " + + dm.getId() + " ,sortedRegions=" + sortedRegionName + + " ,entryEventList=" + printEntryEventList(entryEventList) + + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + + " ,result= " + localResultMsg != null + " ,finalResult-old= " + + finalResult); + } + finalResult = finalResult && (localResultMsg != null); + } + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualSend != null) { + // internalAfterIndividualSend.run(); + // } + + /* + * [DISTTX] TODO see how to handle exception + */ + + /* + * [DISTTX] TODO Any test hooks + */ + // if (internalAfterIndividualCommitProcess != null) { + // // Testing callback + // internalAfterIndividualCommitProcess.run(); + // } + + { // Wait for results + dm.getCancelCriterion().checkCancelInProgress(null); + processor.waitForPrecommitCompletion(); + + // [DISTTX} TODO Handle stats + dm.getStats().incCommitWaits(); + + Map remoteResults = processor + .getCommitResponseMap(); + for (Entry e : remoteResults + .entrySet()) { + DistributedMember target = e.getKey(); + TXCommitMessage remoteResultMsg = e.getValue(); + if (logger.isDebugEnabled()) { // TODO - make this trace level + logger + .debug("DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + + target + " ,result= " + remoteResultMsg != null + + " ,finalResult-old= " + finalResult); + } + finalResult = finalResult && remoteResultMsg != null; + } + } + + /* + * [DISTTX] TODO Write similar method to take out exception + * + * [DISTTX] TODO Handle Reliable regions + */ + // if (this.hasReliableRegions) { + // checkDistributionReliability(distMap, processor); + // } + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.doCommit finalResult= " + + finalResult); + } + return finalResult; + } + + /** + * For distributed transactions, this divides the user's putAll operation into + * multiple per bucket putAll ops(with entries to be put in that bucket) and + * then fires those using using appropriate TXStateStub (for target that host + * the corresponding bucket) + */ + @Override + public void postPutAll(DistributedPutAllOperation putallOp, + VersionedObjectList successfulPuts, LocalRegion region) { + if (putallOp.putAllData.length == 0) { + return; + } + if (region instanceof DistributedRegion) { + super.postPutAll(putallOp, successfulPuts, region); + } else { + region.getCancelCriterion().checkCancelInProgress(null); // fix for bug + // #43651 + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll " + + "processing putAll op for region {}, size of putAllOp " + + "is {}", region, putallOp.putAllData.length); + } + + + //map of bucketId to putall op for this bucket + HashMap bucketToPutallMap = + new HashMap(); + //map of bucketId to TXStateStub for target that hosts this bucket + HashMap bucketToTxStateStubMap = + new HashMap(); + + //separate the putall op per bucket + for (int i=0; i e : bucketToTxStateStubMap + .entrySet()) { + Integer bucketId = e.getKey(); + DistTXCoordinatorInterface dtsi = e.getValue(); + DistributedPutAllOperation putAllForBucket = bucketToPutallMap + .get(bucketId); + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.postPutAll processing" + + " putAll for ##bucketId = {}, ##txStateStub = {}, " + + "##putAllOp = {}" + , bucketId, dtsi, putAllForBucket); + } + dtsi.postPutAll(putAllForBucket, successfulPuts, region); + } + } + } + + /** + * For distributed transactions, this divides the user's removeAll operation + * into multiple per bucket removeAll ops(with entries to be removed from that + * bucket) and then fires those using using appropriate TXStateStub (for + * target that host the corresponding bucket) + */ + @Override + public void postRemoveAll(DistributedRemoveAllOperation op, + VersionedObjectList successfulOps, LocalRegion region) { + if (op.removeAllData.length == 0) { + return; + } + if (region instanceof DistributedRegion) { + super.postRemoveAll(op, successfulOps, region); + } else { + region.getCancelCriterion().checkCancelInProgress(null); // fix for bug + // #43651 + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll " + + "processing removeAll op for region {}, size of removeAll " + + "is {}", region, op.removeAllDataSize); + } + + //map of bucketId to removeAll op for this bucket + HashMap bucketToRemoveAllMap = + new HashMap(); + //map of bucketId to TXStateStub for target that hosts this bucket + HashMap bucketToTxStateStubMap = + new HashMap(); + + //separate the removeAll op per bucket + for (int i=0; i e : bucketToTxStateStubMap + .entrySet()) { + Integer bucketId = e.getKey(); + DistTXCoordinatorInterface dtsi = e.getValue(); + DistributedRemoveAllOperation removeAllForBucket = bucketToRemoveAllMap + .get(bucketId); + + if (logger.isDebugEnabled()) { + logger.debug("DistTXStateProxyImplOnCoordinator.postRemoveAll processing" + + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + + "##removeAllOp = {}" + , bucketId, dtsi, removeAllForBucket); + } + dtsi.postRemoveAll(removeAllForBucket, successfulOps, region); + } + + } + } + + @Override + public boolean isCreatedOnDistTxCoordinator() { + return true; + } + + public static String printEntryEventMap( + HashMap> txRegionVersionsMap) { + StringBuilder str = new StringBuilder(); + str.append(" ("); + str.append(txRegionVersionsMap.size()); + str.append(")=[ "); + for (Map.Entry> entry : txRegionVersionsMap + .entrySet()) { + str.append(" {").append(entry.getKey()); + str.append(":").append("size(").append(entry.getValue().size()) + .append(")"); + str.append("=").append(entry.getValue()).append("}, "); + } + str.append(" } "); + return str.toString(); + } + + public static String printEntryEventList( + ArrayList> entryEventList) { + StringBuilder str = new StringBuilder(); + str.append(" ("); + str.append(entryEventList.size()); + str.append(")=[ "); + for (ArrayList entry : entryEventList) { + str.append(" ( "); + str.append(entry.size()); + str.append(" )={").append(entry); + str.append(" } "); + } + str.append(" ] "); + return str.toString(); + } + + /* + * Do not return null + */ + public DistributedMember getOwnerForKey(LocalRegion r, KeyInfo key) { + DistributedMember m = r.getOwnerForKey(key); + if (m == null) { + GemFireCacheImpl cache = GemFireCacheImpl.getExisting("getOwnerForKey"); + m = cache.getDistributedSystem().getDistributedMember(); + } + return m; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java new file mode 100644 index 0000000..8750158 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXStateProxyImplOnDatanode.java @@ -0,0 +1,112 @@ +package com.gemstone.gemfire.internal.cache; + +import java.util.ArrayList; +import java.util.TreeMap; + +import com.gemstone.gemfire.cache.CommitConflictException; +import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.TXEntryState.DistTxThinEntryState; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + +public class DistTXStateProxyImplOnDatanode extends DistTXStateProxyImpl { + + private DistTXPrecommitMessage preCommitMessage = null; + private boolean preCommitResponse = false; + + public DistTXStateProxyImplOnDatanode(TXManagerImpl managerImpl, TXId id, + InternalDistributedMember clientMember) { + super(managerImpl, id, clientMember); + } + + public DistTXStateProxyImplOnDatanode(TXManagerImpl managerImpl, TXId id, + boolean isjta) { + super(managerImpl, id, isjta); + } + + @Override + public TXStateInterface getRealDeal(KeyInfo key, LocalRegion r) { + if (this.realDeal == null) { + this.realDeal = new DistTXState(this, false); + if (r != null) { + // wait for the region to be initialized fixes bug 44652 + r.waitOnInitialization(r.initializationLatchBeforeGetInitialImage); + target = r.getOwnerForKey(key); + } + if (logger.isDebugEnabled()) { + logger.debug("Built a new DistTXState: {} me:{}", this.realDeal, + this.txMgr.getDM().getId()); + } + } + return this.realDeal; + } + + @Override + public TXStateInterface getRealDeal(DistributedMember t) { + assert t != null; + if (this.realDeal == null) { + this.target = t; + this.realDeal = new DistTXState(this, false); + if (logger.isDebugEnabled()) { + logger.debug("Built a new DistTXState: {} me:{}", this.realDeal, + this.txMgr.getDM().getId()); + } + } + return this.realDeal; + } + + private DistTXState getRealDeal() + throws UnsupportedOperationInTransactionException { + if (this.realDeal == null || !this.realDeal.isDistTx() + || !this.realDeal.isTxState() + || this.realDeal.isCreatedOnDistTxCoordinator()) { + throw new UnsupportedOperationInTransactionException( + LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString( + "DistTXStateOnDatanode", this.realDeal != null ? this.realDeal + .getClass().getSimpleName() : "null")); + } + return (DistTXState) this.realDeal; + } + + @Override + public void precommit() throws CommitConflictException, + UnsupportedOperationInTransactionException { + try { + DistTXState txState = getRealDeal(); + boolean retVal = txState.applyOpsOnRedundantCopy( + this.preCommitMessage.getSender(), + this.preCommitMessage.getSecondaryTransactionalOperations()); + if (retVal) { + setCommitOnBehalfOfRemoteStub(true); + txState.precommit(); + } + this.preCommitResponse = retVal; // assign at last, if no exception + } catch (UnsupportedOperationInTransactionException e) { + throw e; + } finally { + inProgress = true; + } + } + + public void setPreCommitMessage(DistTXPrecommitMessage preCommitMessage) { + this.preCommitMessage = preCommitMessage; + } + + public boolean getPreCommitResponse() { + return preCommitResponse; + } + + /* + * Populate list of versions for each region while replying precommit + */ + public boolean populateDistTxEntryStateList( + TreeMap> entryStateSortedMap) { + return getRealDeal().populateDistTxEntryStateList(entryStateSortedMap); + } + + public void populateDistTxEntryStates( + ArrayList> entryEventList) { + getRealDeal().setDistTxEntryStates(entryEventList); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java index 9dc24af..d557be6 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java @@ -51,6 +51,8 @@ import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.CopyOnWriteHashSet; import com.gemstone.gemfire.internal.InternalDataSerializer; import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllMessage; +import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter; import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo; import com.gemstone.gemfire.internal.cache.UpdateOperation.UpdateMessage; import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage; @@ -63,6 +65,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.offheap.OffHeapReference; +import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.sequencelog.EntryLogger; /** @@ -102,6 +106,50 @@ public abstract class DistributedCacheOperation { * @since 5.7 */ public static final byte DESERIALIZATION_POLICY_LAZY = (byte)2; + + /** + * @param deserializationPolicy must be one of the following: DESERIALIZATION_POLICY_NONE, DESERIALIZATION_POLICY_EAGER, DESERIALIZATION_POLICY_LAZY. + */ + public static void writeValue(final byte deserializationPolicy, final Object vObj, final byte[] vBytes, final DataOutput out) throws IOException { + if (vObj != null) { + if (deserializationPolicy == DESERIALIZATION_POLICY_EAGER) { + // for DESERIALIZATION_POLICY_EAGER avoid extra byte array serialization + DataSerializer.writeObject(vObj, out); + } else if (deserializationPolicy == DESERIALIZATION_POLICY_NONE) { + // We only have NONE with a vObj when vObj is off-heap and not serialized. + OffHeapReference ohref = (OffHeapReference) vObj; + assert !ohref.isSerialized(); + DataSerializer.writeByteArray(ohref.getValueAsHeapByteArray(), out); + } else { // LAZY + // TODO OFFHEAP MERGE: cache the oldValue that is serialized here + // into the event + DataSerializer.writeObjectAsByteArray(vObj, out); + } + } else { + if (deserializationPolicy == DESERIALIZATION_POLICY_EAGER) { + // object is already in serialized form in the byte array. + // So just write the bytes to the stream. + // fromData will call readObject which will deserialize to object form. + out.write(vBytes); + } else { + DataSerializer.writeByteArray(vBytes, out); + } + } + } + // static values for oldValueIsObject + public static final byte VALUE_IS_BYTES = 0; + public static final byte VALUE_IS_SERIALIZED_OBJECT = 1; + public static final byte VALUE_IS_OBJECT = 2; + + /** + * Given a VALUE_IS_* constant convert and return the corresponding DESERIALIZATION_POLICY_*. + */ + public static byte valueIsToDeserializationPolicy(boolean oldValueIsSerialized) { + if (!oldValueIsSerialized) return DESERIALIZATION_POLICY_NONE; + if (CachedDeserializableFactory.preferObject()) return DESERIALIZATION_POLICY_EAGER; + return DESERIALIZATION_POLICY_LAZY; + } + public final static byte DESERIALIZATION_POLICY_NUMBITS = DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY); @@ -792,7 +840,7 @@ public abstract class DistributedCacheOperation { public static abstract class CacheOperationMessage extends SerialDistributionMessage implements MessageWithReply, - DirectReplyMessage, ReliableDistributionData { + DirectReplyMessage, ReliableDistributionData, OldValueImporter { protected final static short POSSIBLE_DUPLICATE_MASK = POS_DUP; protected final static short OLD_VALUE_MASK = @@ -810,6 +858,10 @@ public abstract class DistributedCacheOperation { private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400; + protected final static short FETCH_FROM_HDFS = 0x200; + + protected final static short IS_PUT_DML = 0x100; + public boolean needsRouting; protected String regionPath; @@ -906,20 +958,18 @@ public abstract class DistributedCacheOperation { * @param event the entry event that contains the old value */ public void appendOldValueToMessage(EntryEventImpl event) { - Object val = event.getRawOldValue(); - if (val == Token.NOT_AVAILABLE || - val == Token.REMOVED_PHASE1 || - val == Token.REMOVED_PHASE2 || - val == Token.DESTROYED || - val == Token.TOMBSTONE) { - return; - } - if (val instanceof CachedDeserializable) { - val = ((CachedDeserializable)val).getValue(); + { + @Unretained Object val = event.getRawOldValue(); + if (val == null || + val == Token.NOT_AVAILABLE || + val == Token.REMOVED_PHASE1 || + val == Token.REMOVED_PHASE2 || + val == Token.DESTROYED || + val == Token.TOMBSTONE) { + return; + } } - this.oldValue = val; - this.hasOldValue = true; - this.oldValueIsSerialized = (val instanceof byte[]); + event.exportOldValue(this); } /** @@ -977,6 +1027,12 @@ public abstract class DistributedCacheOperation { public boolean containsRegionContentChange() { return true; } + + protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) { + Assert.assertTrue(this.regionPath != null, "regionPath was null"); + GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); + return gfc.getRegionByPathForProcessing(this.regionPath); + } @Override protected final void process(final DistributionManager dm) { @@ -998,12 +1054,7 @@ public abstract class DistributedCacheOperation { return; } - Assert.assertTrue(this.regionPath != null, "regionPath was null"); - - GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm - .getSystem()); - final LocalRegion lclRgn = gfc - .getRegionByPathForProcessing(this.regionPath); + final LocalRegion lclRgn = getLocalRegionForProcessing(dm); sendReply = false; basicProcess(dm, lclRgn); } catch (CancelException e) { @@ -1084,6 +1135,7 @@ public abstract class DistributedCacheOperation { } event = createEvent(rgn); + try { boolean isEntry = event.getOperation().isEntry(); if (isEntry && this.possibleDuplicate) { @@ -1105,6 +1157,11 @@ public abstract class DistributedCacheOperation { } sendReply = operateOnRegion(event, dm) && sendReply; + } finally { + if (event instanceof EntryEventImpl) { + ((EntryEventImpl) event).release(); + } + } } catch (RegionDestroyedException e) { this.closed = true; if (logger.isDebugEnabled()) { @@ -1287,12 +1344,15 @@ public abstract class DistributedCacheOperation { this.hasDelta = (bits & DELTA_MASK) != 0; this.hasOldValue = (bits & OLD_VALUE_MASK) != 0; if (this.hasOldValue) { - // below boolean is not strictly required, but this is for compatibility - // with SQLFire code which writes as byte here to indicate whether - // oldValue is an object, serialized object or byte[] - in.readByte(); + byte b = in.readByte(); + if (b == 0) { + this.oldValueIsSerialized = false; + } else if (b == 1) { + this.oldValueIsSerialized = true; + } else { + throw new IllegalStateException("expected 0 or 1"); + } this.oldValue = DataSerializer.readByteArray(in); - this.oldValueIsSerialized = true; } boolean hasFilterInfo = (bits & FILTER_INFO_MASK) != 0; this.needsRouting = (bits & NEEDS_ROUTING_MASK) != 0; @@ -1306,6 +1366,10 @@ public abstract class DistributedCacheOperation { } if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) { this.inhibitAllNotifications = true; + if (this instanceof PutAllMessage) { + ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0); + ((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0); + } } } @@ -1332,18 +1396,20 @@ public abstract class DistributedCacheOperation { DataSerializer.writeObject(this.callbackArg, out); } if (this.hasOldValue) { - // below boolean is not strictly required, but this is for compatibility - // with SQLFire code which writes as byte here to indicate whether - // oldValue is an object, serialized object or byte[] out.writeByte(this.oldValueIsSerialized ? 1 : 0); // the receiving side expects that the old value will have been serialized // as a byte array - if (this.oldValueIsSerialized) { - DataSerializer.writeByteArray((byte[])this.oldValue, out); - } - else { - DataSerializer.writeObjectAsByteArray(this.oldValue, out); + final byte policy = valueIsToDeserializationPolicy(this.oldValueIsSerialized); + final Object vObj; + final byte[] vBytes; + if (!this.oldValueIsSerialized && this.oldValue instanceof byte[]) { + vObj = null; + vBytes = (byte[])this.oldValue; + } else { + vObj = this.oldValue; + vBytes = null; } + writeValue(policy, vObj, vBytes, out); } if (this.filterRouting != null) { InternalDataSerializer.invokeToData(this.filterRouting, out); @@ -1427,6 +1493,50 @@ public abstract class DistributedCacheOperation { public void setSendDelta(boolean sendDelta) { this.sendDelta = sendDelta; } + + @Override + public boolean prefersOldSerialized() { + return true; + } + + @Override + public boolean isUnretainedOldReferenceOk() { + return true; + } + + @Override + public boolean isCachedDeserializableValueOk() { + return false; + } + + @Override + public void importOldObject(Object ov, boolean isSerialized) { + this.oldValueIsSerialized = isSerialized; + this.oldValue = ov; + this.hasOldValue = true; + } + + @Override + public void importOldBytes(byte[] ov, boolean isSerialized) { + this.oldValueIsSerialized = isSerialized; + this.oldValue = ov; + this.hasOldValue = true; + } + + protected final boolean _mayAddToMultipleSerialGateways(DistributionManager dm) { + int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); + try { + LocalRegion lr = getLocalRegionForProcessing(dm); + if (lr == null) { + return false; + } + return lr.notifiesMultipleSerialGateways(); + } catch (CancelException ignore) { + return false; + } finally { + LocalRegion.setThreadInitLevelRequirement(oldLevel); + } + } } /** Custom subclass that keeps all ReplyExceptions */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java index 972d43b..8a9434d 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java @@ -105,6 +105,23 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation return putAllData; } + public void setPutAllEntryData(PutAllEntryData[] putAllEntryData) { + for (int i = 0; i < putAllEntryData.length; i++) { + putAllData[i] = putAllEntryData[i]; + } + this.putAllDataSize = putAllEntryData.length; + } + + /** + * Add an entry that this putall operation should distribute. + */ + public void addEntry(PutAllEntryData putAllEntry) + { + this.putAllData[this.putAllDataSize] = putAllEntry; + this.putAllDataSize += 1; + //cachedEvents.add(ev); + } + /** * Add an entry that this putall operation should distribute. */ @@ -179,6 +196,17 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation }; } + public void freeOffHeapResources() { + // I do not use eventIterator here because it forces the lazy creation of EntryEventImpl by calling getEventForPosition. + for (int i=0; i < this.putAllDataSize; i++) { + PutAllEntryData entry = this.putAllData[i]; + if (entry != null && entry.event != null) { + entry.event.release(); + } + } + } + + public EntryEventImpl getEventForPosition(int position) { PutAllEntryData entry = this.putAllData[position]; if (entry == null) { @@ -188,7 +216,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation return entry.event; } LocalRegion region = (LocalRegion)this.event.getRegion(); - EntryEventImpl ev = new EntryEventImpl( + EntryEventImpl ev = EntryEventImpl.create( region, entry.getOp(), entry.getKey(), null/* value */, this.event.getCallbackArgument(), @@ -196,6 +224,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation this.event.getDistributedMember(), this.event.isGenerateCallbacks(), entry.getEventID()); + boolean returnedEv = false; + try { ev.setPossibleDuplicate(entry.isPossibleDuplicate()); if (entry.versionTag != null && region.concurrencyChecksEnabled) { VersionSource id = entry.versionTag.getMemberID(); @@ -207,6 +237,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation } entry.event = ev; + returnedEv = true; if (entry.getValue() == null && ev.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) { ev.setLocalInvalid(true); } @@ -223,6 +254,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation ev.callbacksInvoked(entry.isCallbacksInvoked()); ev.setTailKey(entry.getTailKey()); return ev; + } finally { + if (!returnedEv) { + ev.release(); + } + } } public final EntryEventImpl getBaseEvent() { @@ -269,8 +305,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation public PutAllEntryData(EntryEventImpl event) { this.key = event.getKey(); - this.value = event.getRawNewValue(); - Object oldValue = event.getRawOldValue(); + this.value = event.getRawNewValueAsHeapObject(); + Object oldValue = event.getRawOldValueAsHeapObject(); if (oldValue == Token.NOT_AVAILABLE || Token.isRemoved(oldValue)) { this.oldValue = null; @@ -808,6 +844,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation PutAllMessage msg = new PutAllMessage(); msg.eventId = event.getEventId(); msg.context = event.getContext(); + msg.setFetchFromHDFS(event.isFetchFromHDFS()); + msg.setPutDML(event.isPutDML()); return msg; } @@ -821,7 +859,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) { final EntryEventImpl event = getBaseEvent(); PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true, - event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument()); + event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/); if (event.getContext() != null) { prMsg.setBridgeContext(event.getContext()); } @@ -850,7 +888,8 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId); if (prMsg == null) { prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false, - event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument()); + event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML()); + prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed()); // set dpao's context(original sender) into each PutAllMsg // dpao's event's context could be null if it's P2P putAll in PR @@ -1026,6 +1065,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation protected EventID eventId = null; + // By default, fetchFromHDFS == true; + private transient boolean fetchFromHDFS = true; + + private transient boolean isPutDML = false; + protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START; protected static final short SKIP_CALLBACKS = (short)(HAS_BRIDGE_CONTEXT << 1); @@ -1045,7 +1089,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation throws EntryNotFoundException { // Gester: We have to specify eventId for the message of MAP - EntryEventImpl event = new EntryEventImpl( + EntryEventImpl event = EntryEventImpl.create( rgn, Operation.PUTALL_UPDATE /* op */, null /* key */, null/* value */, this.callbackArg, true /* originRemote */, getSender()); @@ -1080,11 +1124,13 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation * the region the entry is put in */ public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn, - boolean requiresRegionContext) { + boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) { EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(), this.context, rgn, requiresRegionContext, this.possibleDuplicate, this.needsRouting, this.callbackArg, true, skipCallbacks); + ev.setFetchFromHDFS(fetchFromHDFS); + ev.setPutDML(isPutDML); // we don't need to set old value here, because the msg is from remote. local old value will get from next step try { super.basicOperateOnRegion(ev, rgn); @@ -1094,6 +1140,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag()); } } + ev.release(); } } @@ -1119,10 +1166,12 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation ((KeyWithRegionContext)key).setRegionContext(rgn); } EventID evId = entry.getEventID(); - EntryEventImpl ev = new EntryEventImpl(rgn, entry.getOp(), + EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(), key, null/* value */, callbackArg, originRemote, sender, !skipCallbacks, evId); + boolean returnedEv = false; + try { if (context != null) { ev.context = context; } @@ -1147,7 +1196,13 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation * Setting tailKey for the secondary bucket here. Tail key was update by the primary. */ ev.setTailKey(entry.getTailKey()); + returnedEv = true; return ev; + } finally { + if (!returnedEv) { + ev.release(); + } + } } @Override @@ -1168,7 +1223,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender); } putAllData[i].setSender(sender); - doEntryPut(putAllData[i], rgn, requiresRegionContext); + doEntryPut(putAllData[i], rgn, requiresRegionContext, fetchFromHDFS, isPutDML); } } }, ev.getEventId()); @@ -1212,7 +1267,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation @Override public void toData(DataOutput out) throws IOException { - super.toData(out); DataSerializer.writeObject(this.eventId, out); InternalDataSerializer.writeUnsignedVL(this.putAllDataSize, out); @@ -1297,5 +1351,25 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation } return Arrays.asList(ops); } + + public void setFetchFromHDFS(boolean val) { + this.fetchFromHDFS = val; + } + + public void setPutDML(boolean val) { + this.isPutDML = val; + } + + @Override + protected short computeCompressedExtBits(short bits) { + bits = super.computeCompressedExtBits(bits); + if (fetchFromHDFS) { + bits |= FETCH_FROM_HDFS; + } + if (isPutDML) { + bits |= IS_PUT_DML; + } + return bits; + } } }