Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 18FC817B0C for ; Thu, 14 May 2015 19:54:53 +0000 (UTC) Received: (qmail 76589 invoked by uid 500); 14 May 2015 19:54:53 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 76552 invoked by uid 500); 14 May 2015 19:54:52 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 76543 invoked by uid 99); 14 May 2015 19:54:52 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 May 2015 19:54:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 58F781827B6 for ; Thu, 14 May 2015 19:54:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id MhocQ7J1xtJm for ; Thu, 14 May 2015 19:54:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 2BFFB43C92 for ; Thu, 14 May 2015 19:54:40 +0000 (UTC) Received: (qmail 75411 invoked by uid 99); 14 May 2015 19:54:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 May 2015 19:54:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1453E3A59; Thu, 14 May 2015 19:54:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 14 May 2015 19:54:41 -0000 Message-Id: <1e6d15187e174354835dd05c5f52d7e4@git.apache.org> In-Reply-To: <0130502c163b4f888ed51c76edc824c7@git.apache.org> References: <0130502c163b4f888ed51c76edc824c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/47] incubator-ignite git commit: # ignite-157 # ignite-157 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/93876df9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/93876df9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/93876df9 Branch: refs/heads/ignite-889 Commit: 93876df9373c260eaa1e8f8dc9e8edbb82110810 Parents: c3f3dd1 Author: sboikov Authored: Thu May 7 11:36:38 2015 +0300 Committer: sboikov Committed: Thu May 7 11:36:38 2015 +0300 ---------------------------------------------------------------------- .../cache/distributed/dht/GridDhtTxMapping.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 25 +- .../colocated/GridDhtDetachedCacheEntry.java | 4 +- .../near/GridAbstractNearTxPrepareFuture.java | 219 ++++++ .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearLockFuture.java | 5 - .../near/GridNearOptimisticTxPrepareFuture.java | 763 +++++++++++++++++++ .../GridNearPessimisticTxPrepareFuture.java | 311 ++++++++ .../cache/distributed/near/GridNearTxLocal.java | 41 +- .../near/GridNearTxPrepareFuture.java | 12 +- .../cache/transactions/IgniteTxHandler.java | 2 +- 11 files changed, 1313 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index d207d76..ba2c35f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@ -28,7 +28,7 @@ import java.util.*; /** * DHT transaction mapping. */ -public class GridDhtTxMapping { +public class GridDhtTxMapping { /** Transaction nodes mapping (primary node -> related backup nodes). */ private final Map> txNodes = new GridLeanMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 5b74b31..7da6346 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1279,25 +1279,18 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); - - return; - } - - // Set value to detached entry. - entry.resetFromPrimary(newVal, dhtVer); - - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - } - catch (IgniteCheckedException e) { - onDone(e); + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); return; } + + // Set value to detached entry. + entry.resetFromPrimary(newVal, dhtVer); + + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); } else cctx.mvcc().markExplicitOwner(k, threadId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index 5c4dd13..2c84bd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -46,10 +46,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { * * @param val Value. * @param ver Version. - * @throws IgniteCheckedException If value unmarshalling failed. */ - public void resetFromPrimary(CacheObject val, GridCacheVersion ver) - throws IgniteCheckedException { + public void resetFromPrimary(CacheObject val, GridCacheVersion ver) { value(val); this.ver = ver; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java new file mode 100644 index 0000000..905f018 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridAbstractNearTxPrepareFuture.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import javax.cache.expiry.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; + +/** + * Common code for tx prepare in optimistic and pessimistic modes. + */ +public abstract class GridAbstractNearTxPrepareFuture extends GridCompoundIdentityFuture + implements GridCacheFuture { + /** Logger reference. */ + protected static final AtomicReference logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Context. */ + protected GridCacheSharedContext cctx; + + /** Future ID. */ + protected IgniteUuid futId; + + /** Transaction. */ + @GridToStringInclude + protected GridNearTxLocal tx; + + /** Error. */ + @GridToStringExclude + protected AtomicReference err = new AtomicReference<>(null); + + /** Trackable flag. */ + protected boolean trackable = true; + + /** Full information about transaction nodes mapping. */ + protected GridDhtTxMapping txMapping; + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridAbstractNearTxPrepareFuture(GridCacheSharedContext cctx, final GridNearTxLocal tx) { + super(cctx.kernalContext(), new IgniteReducer() { + @Override public boolean collect(IgniteInternalTx e) { + return true; + } + + @Override public IgniteInternalTx reduce() { + // Nothing to aggregate. + return tx; + } + }); + + assert cctx != null; + assert tx != null; + + this.cctx = cctx; + this.tx = tx; + + futId = IgniteUuid.randomUuid(); + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridAbstractNearTxPrepareFuture.class); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return tx.xidVersion(); + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + trackable = false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** + * Prepares transaction. + */ + public abstract void prepare(); + + /** + * @param nodeId Sender. + * @param res Result. + */ + public abstract void onResult(UUID nodeId, GridNearTxPrepareResponse res); + + /** + * Checks if mapped transaction can be committed on one phase. + * One-phase commit can be done if transaction maps to one primary node and not more than one backup. + */ + protected final void checkOnePhase() { + if (tx.storeUsed()) + return; + + Map> map = txMapping.transactionNodes(); + + if (map.size() == 1) { + Map.Entry> entry = F.firstEntry(map); + + assert entry != null; + + Collection backups = entry.getValue(); + + if (backups.size() <= 1) + tx.onePhaseCommit(true); + } + } + + /** + * @param m Mapping. + * @param res Response. + */ + protected final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) { + assert res.error() == null : res; + assert F.isEmpty(res.invalidPartitions()) : res; + + for (Map.Entry entry : res.ownedValues().entrySet()) { + IgniteTxEntry txEntry = tx.entry(entry.getKey()); + + assert txEntry != null; + + GridCacheContext cacheCtx = txEntry.context(); + + while (true) { + try { + if (cacheCtx.isNear()) { + GridNearCacheEntry nearEntry = (GridNearCacheEntry)txEntry.cached(); + + CacheVersionedValue tup = entry.getValue(); + + nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(), + tup.version(), m.node().id(), tx.topologyVersion()); + } + else if (txEntry.cached().detached()) { + GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached(); + + CacheVersionedValue tup = entry.getValue(); + + detachedEntry.resetFromPrimary(tup.value(), tx.xidVersion()); + } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + // Retry. + } + } + } + + tx.implicitSingleResult(res.returnValue()); + + for (IgniteTxKey key : res.filterFailedKeys()) { + IgniteTxEntry txEntry = tx.entry(key); + + assert txEntry != null : "Missing tx entry for write key: " + key; + + txEntry.op(NOOP); + + assert txEntry.context() != null; + + ExpiryPolicy expiry = txEntry.context().expiryForTxEntry(txEntry); + + if (expiry != null) + txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess())); + } + + if (!m.empty()) { + // Register DHT version. + tx.addDhtVersion(m.node().id(), res.dhtVersion()); + + m.dhtVersion(res.dhtVersion()); + + if (m.near()) + tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index c7fa4ab..797fd32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -204,15 +204,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @param topVer Topology version. * @return {@code True} if reset was done. * @throws GridCacheEntryRemovedException If obsolete. - * @throws IgniteCheckedException If failed. */ - @SuppressWarnings( {"RedundantTypeArguments"}) public boolean resetFromPrimary(CacheObject val, GridCacheVersion ver, GridCacheVersion dhtVer, UUID primaryNodeId, AffinityTopologyVersion topVer) - throws GridCacheEntryRemovedException, IgniteCheckedException + throws GridCacheEntryRemovedException { assert dhtVer != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index a427b65..25bd76b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1450,11 +1450,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture { + /** */ + private Collection lockKeys = new GridConcurrentHashSet<>(); + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() : tx; + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + lockKeys.remove(entry.txKey()); + + // This will check for locks. + onDone(); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection nodes() { + return F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture) fut; + + if (f.node().id().equals(nodeId)) { + f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + + found = true; + } + } + } + + return found; + } + + /** + * @param nodeId Failed node ID. + * @param mappings Remaining mappings. + * @param e Error. + */ + void onError(@Nullable UUID nodeId, @Nullable Iterable mappings, Throwable e) { + if (err.compareAndSet(null, e)) { + boolean marked = tx.setRollbackOnly(); + + if (e instanceof IgniteTxOptimisticCheckedException) { + assert nodeId != null : "Missing node ID for optimistic failure exception: " + e; + + tx.removeKeysMapping(nodeId, mappings); + } + + if (e instanceof IgniteTxRollbackCheckedException) { + if (marked) { + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to automatically rollback transaction: " + tx, ex); + } + } + } + + onComplete(); + } + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(nodeId, res); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteInternalTx t, Throwable err) { + // If locks were not acquired yet, delay completion. + if (isDone() || (err == null && !checkLocks())) + return false; + + this.err.compareAndSet(null, err); + + if (err == null) + tx.state(PREPARED); + + if (super.onDone(tx, err)) { + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + */ + private void onComplete() { + if (super.onDone(tx, err.get())) + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + } + + /** {@inheritDoc} */ + @Override public void prepare() { + // Obtain the topology version to use. + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer != null) { + tx.topologyVersion(topVer); + + prepare0(); + + return; + } + + prepareOnTopology(); + } + + /** + * + */ + private void prepareOnTopology() { + GridDhtTopologyFuture topFut = topologyReadLock(); + + try { + if (topFut == null) { + assert isDone(); + + return; + } + + if (topFut.isDone()) { + StringBuilder invalidCaches = new StringBuilder(); + + boolean cacheInvalid = false; + + for (GridCacheContext ctx : cctx.cacheContexts()) { + if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx)) { + if (cacheInvalid) + invalidCaches.append(", "); + + invalidCaches.append(U.maskName(ctx.name())); + + cacheInvalid = true; + } + } + + if (cacheInvalid) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString())); + + return; + } + + tx.topologyVersion(topFut.topologyVersion()); + + prepare0(); + } + else { + topFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture t) { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + prepareOnTopology(); + } + }); + } + }); + } + } + finally { + topologyReadUnlock(); + } + } + + /** + * Acquires topology read lock. + * + * @return Topology ready future. + */ + private GridDhtTopologyFuture topologyReadLock() { + if (tx.activeCacheIds().isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** + * Releases topology read lock. + */ + private void topologyReadUnlock() { + if (!tx.activeCacheIds().isEmpty()) { + GridCacheContext nonLocCtx = null; + + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** + * Initializes future. + */ + private void prepare0() { + try { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + // Make sure to add future before calling prepare. + cctx.mvcc().addFuture(this); + + prepare( + tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.emptyList(), + tx.writeEntries()); + + markInitialized(); + } + catch (TransactionTimeoutException | TransactionOptimisticException e) { + onError(cctx.localNodeId(), null, e); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param reads Read entries. + * @param writes Write entries. + * @throws IgniteCheckedException If transaction is group-lock and some key was mapped to to the local node. + */ + private void prepare( + Iterable reads, + Iterable writes + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + ConcurrentLinkedDeque8 mappings = + new ConcurrentLinkedDeque8<>(); + + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + + // Assign keys to primary nodes. + GridDistributedTxMapping cur = null; + + for (IgniteTxEntry read : reads) { + GridDistributedTxMapping updated = map(read, topVer, cur, false); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (read.context().isNear()) + tx.nearLocallyMapped(true); + else if (read.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + for (IgniteTxEntry write : writes) { + GridDistributedTxMapping updated = map(write, topVer, cur, true); + + if (cur != updated) { + mappings.offer(updated); + + if (updated.node().isLocal()) { + if (write.context().isNear()) + tx.nearLocallyMapped(true); + else if (write.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + cur = updated; + } + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addEntryMapping(mappings); + + cctx.mvcc().recheckPendingLocks(); + + txMapping.initLast(mappings); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + proceedPrepare(mappings); + } + + /** + * Continues prepare after previous mapping successfully finished. + * + * @param mappings Queue of mappings. + */ + private void proceedPrepare(final ConcurrentLinkedDeque8 mappings) { + if (isDone()) + return; + + final GridDistributedTxMapping m = mappings.poll(); + + if (m == null) + return; + + assert !m.empty(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + tx.optimistic() && tx.serializable() ? m.reads() : null, + m.writes(), + tx.groupLockKey(), + tx.partitionLock(), + m.near(), + txMapping.transactionNodes(), + m.last(), + m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(null, null, e); + } + } + + final MiniFuture fut = new MiniFuture(m, mappings); + + req.miniId(fut.futureId()); + + add(fut); // Append new future. + + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1() { + @Override public void apply(GridNearTxPrepareResponse res) { + fut.onResult(n.id(), res); + } + }); + } + else { + assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx + + ", nodeId=" + n.id() + ']'; + + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + fut.onResult(e); + } + } + } + + /** + * @param entry Transaction entry. + * @param topVer Topology version. + * @param cur Current mapping. + * @throws IgniteCheckedException If transaction is group-lock and local node is not primary for key. + * @return Mapping. + */ + private GridDistributedTxMapping map( + IgniteTxEntry entry, + AffinityTopologyVersion topVer, + GridDistributedTxMapping cur, + boolean waitLock + ) throws IgniteCheckedException { + GridCacheContext cacheCtx = entry.context(); + + List nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + + txMapping.addMapping(nodes); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + if (log.isDebugEnabled()) { + log.debug("Mapped key to primary node [key=" + entry.key() + + ", part=" + cacheCtx.affinity().partition(entry.key()) + + ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); + } + + if (tx.groupLock() && !primary.isLocal()) + throw new IgniteCheckedException("Failed to prepare group lock transaction (local node is not primary for " + + " key)[key=" + entry.key() + ", primaryNodeId=" + primary.id() + ']'); + + // Must re-initialize cached entry while holding topology lock. + if (cacheCtx.isNear()) + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); + else if (!cacheCtx.isLocal()) + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); + else + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); + + if (cacheCtx.isNear() || cacheCtx.isLocal()) { + if (waitLock && entry.explicitVersion() == null) { + if (!tx.groupLock() || tx.groupLockKey().equals(entry.txKey())) + lockKeys.add(entry.txKey()); + } + } + + if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + cur = new GridDistributedTxMapping(primary); + + // Initialize near flag right away. + cur.near(cacheCtx.isNear()); + } + + cur.add(entry); + + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + + entry.nodeId(primary.id()); + + if (cacheCtx.isNear()) { + while (true) { + try { + GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); + + cached.dhtNodeId(tx.xidVersion(), primary.id()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry.cached(cacheCtx.near().entryEx(entry.key())); + } + } + } + + return cur; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearOptimisticTxPrepareFuture.class, this, super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Flag to signal some result being processed. */ + private AtomicBoolean rcvRes = new AtomicBoolean(false); + + /** Mappings to proceed prepare. */ + private ConcurrentLinkedDeque8 mappings; + + /** + * @param m Mapping. + * @param mappings Queue of mappings to proceed with. + */ + MiniFuture( + GridDistributedTxMapping m, + ConcurrentLinkedDeque8 mappings + ) { + this.m = m; + this.mappings = mappings; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @return Keys. + */ + public GridDistributedTxMapping mapping() { + return m; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + else + U.warn(log, "Received error after another result has been processed [fut=" + + GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e); + } + + /** + * @param e Node failure. + */ + void onResult(ClusterTopologyCheckedException e) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); + + // Fail the whole future (make sure not to remap on different primary node + // to prevent multiple lock coordinators). + onError(null, null, e); + } + } + + /** + * @param nodeId Failed node ID. + * @param res Result callback. + */ + void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (res.error() != null) { + // Fail the whole compound future. + onError(nodeId, mappings, res.error()); + } + else { + onPrepareResponse(m, res); + + // Proceed prepare before finishing mini future. + if (mappings != null) + proceedPrepare(mappings); + + // Finish this mini future. + onDone(tx); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java new file mode 100644 index 0000000..84a4ab8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.transactions.*; +import org.apache.ignite.internal.transactions.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; +import static org.apache.ignite.transactions.TransactionState.*; + +/** + * + */ +public class GridNearPessimisticTxPrepareFuture extends GridAbstractNearTxPrepareFuture { + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearPessimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.pessimistic() : tx; + + // Should wait for all mini futures completion before finishing tx. + ignoreChildFailures(IgniteCheckedException.class); + } + + /** {@inheritDoc} */ + @Override public Collection nodes() { + return F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { + return ((MiniFuture)f).node(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture fut : futures()) { + MiniFuture f = (MiniFuture)fut; + + if (f.node().id().equals(nodeId)) { + f.onError(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId)); + + found = true; + } + } + + return found; + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture fut : pending()) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); + + f.onResult(res); + } + } + } + } + /** {@inheritDoc} */ + @Override public void prepare() { + if (!tx.state(PREPARING)) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); + else + onDone(new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onDone(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + try { + tx.userPrepare(); + + cctx.mvcc().addFuture(this); + + preparePessimistic(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * + */ + private void preparePessimistic() { + Map, GridDistributedTxMapping> mappings = new HashMap<>(); + + AffinityTopologyVersion topVer = tx.topologyVersion(); + + txMapping = new GridDhtTxMapping(); + + for (IgniteTxEntry txEntry : tx.allEntries()) { + GridCacheContext cacheCtx = txEntry.context(); + + List nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer); + + ClusterNode primary = F.first(nodes); + + boolean near = cacheCtx.isNear(); + + IgniteBiTuple key = F.t(primary, near); + + GridDistributedTxMapping nodeMapping = mappings.get(key); + + if (nodeMapping == null) { + nodeMapping = new GridDistributedTxMapping(primary); + + nodeMapping.near(cacheCtx.isNear()); + + mappings.put(key, nodeMapping); + } + + txEntry.nodeId(primary.id()); + + nodeMapping.add(txEntry); + + txMapping.addMapping(nodes); + } + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + for (final GridDistributedTxMapping m : mappings.values()) { + final ClusterNode node = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + m.reads(), + m.writes(), + /*grp lock key*/null, + /*part lock*/false, + m.near(), + txMapping.transactionNodes(), + true, + txMapping.transactionNodes().get(node.id()), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + final MiniFuture fut = new MiniFuture(m); + + req.miniId(fut.futureId()); + + add(fut); + + if (node.isLocal()) { + cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1() { + @Override public void apply(GridNearTxPrepareResponse res) { + fut.onResult(res); + } + }); + } + else { + try { + cctx.io().send(node, req, tx.ioPolicy()); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + fut.onError(e); + } + } + } + + markInitialized(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) { + if (err != null) + this.err.compareAndSet(null, err); + + err = this.err.get(); + + if (err == null) + tx.state(PREPARED); + + if (super.onDone(res, err)) { + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearPessimisticTxPrepareFuture.class, this, super.toString()); + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** */ + private GridDistributedTxMapping m; + + /** + * @param m Mapping. + */ + MiniFuture(GridDistributedTxMapping m) { + this.m = m; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @param res Response. + */ + void onResult(GridNearTxPrepareResponse res) { + if (res.error() != null) + onError(res.error()); + else { + onPrepareResponse(m, res); + + onDone(tx); + } + } + + /** + * @param e Error. + */ + void onError(Throwable e) { + err.compareAndSet(null, e); + + onDone(e); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c665354..f7a43bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -683,11 +683,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture prepareAsync() { - GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)prepFut.get(); + GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)prepFut.get(); if (fut == null) { // Future must be created before any exception can be thrown. - fut = new GridNearTxPrepareFuture<>(cctx, this); + fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) : + new GridNearPessimisticTxPrepareFuture(cctx, this); if (!prepFut.compareAndSet(null, fut)) return prepFut.get(); @@ -698,41 +699,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { mapExplicitLocks(); - // For pessimistic mode we don't distribute prepare request and do not lock topology version - // as it was fixed on first lock. - if (pessimistic()) { - if (!state(PREPARING)) { - if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was " + - "rolled back: " + this)); - else - fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + - state() + ", tx=" + this + ']')); - } - else - fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare " + - "[state=" + state() + ", tx=" + this + ']')); - - return fut; - } - - try { - userPrepare(); - - // Make sure to add future before calling prepare. - cctx.mvcc().addFuture(fut); - - fut.prepare(); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - } - else { - // In optimistic mode we must wait for topology map update. - fut.prepare(); - } + fut.prepare(); return fut; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index f573187..962d973 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -79,7 +79,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut private boolean trackable = true; /** Full information about transaction nodes mapping. */ - private GridDhtTxMapping txMapping; + private GridDhtTxMapping txMapping; /** */ private Collection lockKeys = new GridConcurrentHashSet<>(); @@ -497,7 +497,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut assert topVer.topologyVersion() > 0; - txMapping = new GridDhtTxMapping<>(); + txMapping = new GridDhtTxMapping(); ConcurrentLinkedDeque8 mappings = new ConcurrentLinkedDeque8<>(); @@ -580,7 +580,7 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut AffinityTopologyVersion topVer = tx.topologyVersion(); - txMapping = new GridDhtTxMapping<>(); + txMapping = new GridDhtTxMapping(); for (IgniteTxEntry txEntry : tx.allEntries()) { GridCacheContext cacheCtx = txEntry.context(); @@ -996,12 +996,6 @@ public final class GridNearTxPrepareFuture extends GridCompoundIdentityFut catch (GridCacheEntryRemovedException ignored) { // Retry. } - catch (IgniteCheckedException e) { - // Fail the whole compound future. - onError(nodeId, mappings, e); - - return; - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/93876df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index af75fb8..d98b4ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -362,7 +362,7 @@ public class IgniteTxHandler { * @param res Response. */ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) { - GridNearTxPrepareFuture fut = (GridNearTxPrepareFuture)ctx.mvcc() + GridAbstractNearTxPrepareFuture fut = (GridAbstractNearTxPrepareFuture)ctx.mvcc() .future(res.version(), res.futureId()); if (fut == null) {