ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [12/50] [abbrv] ignite git commit: Merge branch master into ignite-264
Date Thu, 03 Sep 2015 01:03:21 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 0000000,234121b..95e1847
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@@ -1,0 -1,817 +1,820 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.transactions.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.transactions.*;
+ import org.jetbrains.annotations.*;
+ import org.jsr166.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+ import static org.apache.ignite.transactions.TransactionState.*;
+ 
+ /**
+  *
+  */
+ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAdapter
+     implements GridCacheMvccFuture<IgniteInternalTx> {
+     /** */
+     @GridToStringInclude
+     private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ 
+     /**
+      * @param cctx Context.
+      * @param tx Transaction.
+      */
+     public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal
tx) {
+         super(cctx, tx);
+ 
+         assert tx.optimistic() : tx;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate
owner) {
+         if (log.isDebugEnabled())
+             log.debug("Transaction future received owner changed callback: " + entry);
+ 
+         if ((entry.context().isNear() || entry.context().isLocal()) && owner !=
null && tx.hasWriteKey(entry.txKey())) {
+             lockKeys.remove(entry.txKey());
+ 
+             // This will check for locks.
+             onDone();
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public Collection<? extends ClusterNode> nodes() {
+         return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>,
ClusterNode>() {
+             @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f)
{
+                 if (isMini(f))
+                     return ((MiniFuture)f).node();
+ 
+                 return cctx.discovery().localNode();
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onNodeLeft(UUID nodeId) {
+         boolean found = false;
+ 
+         for (IgniteInternalFuture<?> fut : futures()) {
+             if (isMini(fut)) {
+                 MiniFuture f = (MiniFuture) fut;
+ 
+                 if (f.node().id().equals(nodeId)) {
+                     f.onResult(new ClusterTopologyCheckedException("Remote node left grid:
" + nodeId));
+ 
+                     found = true;
+                 }
+             }
+         }
+ 
+         return found;
+     }
+ 
+     /**
+      * @param nodeId Failed node ID.
+      * @param mappings Remaining mappings.
+      * @param e Error.
+      */
+     void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping>
mappings, Throwable e) {
++        if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class))
{
++            if (tx.onePhaseCommit())
++                tx.markForBackupCheck();
++        }
++
+         if (err.compareAndSet(null, e)) {
+             boolean marked = tx.setRollbackOnly();
+ 
+             if (e instanceof IgniteTxOptimisticCheckedException) {
+                 assert nodeId != null : "Missing node ID for optimistic failure exception:
" + e;
+ 
+                 tx.removeKeysMapping(nodeId, mappings);
+             }
+ 
+             if (e instanceof IgniteTxRollbackCheckedException) {
+                 if (marked) {
+                     try {
+                         tx.rollback();
+                     }
+                     catch (IgniteCheckedException ex) {
+                         U.error(log, "Failed to automatically rollback transaction: " +
tx, ex);
+                     }
+                 }
+             }
+ 
+             onComplete();
+         }
+     }
+ 
+     /**
+      * @return {@code True} if all locks are owned.
+      */
+     private boolean checkLocks() {
+         boolean locked = lockKeys.isEmpty();
+ 
+         if (locked) {
+             if (log.isDebugEnabled())
+                 log.debug("All locks are acquired for near prepare future: " + this);
+         }
+         else {
+             if (log.isDebugEnabled())
+                 log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys
+ ']');
+         }
+ 
+         return locked;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+         if (!isDone()) {
+             for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+                 if (isMini(fut)) {
+                     MiniFuture f = (MiniFuture)fut;
+ 
+                     if (f.futureId().equals(res.miniId())) {
+                         assert f.node().id().equals(nodeId);
+ 
+                         f.onResult(nodeId, res);
+                     }
+                 }
+             }
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+         // If locks were not acquired yet, delay completion.
+         if (isDone() || (err == null && !checkLocks()))
+             return false;
+ 
+         this.err.compareAndSet(null, err);
+ 
+         if (err == null)
+             tx.state(PREPARED);
+ 
+         if (super.onDone(tx, err)) {
+             // Don't forget to clean up.
+             cctx.mvcc().removeFuture(this);
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param f Future.
+      * @return {@code True} if mini-future.
+      */
+     private boolean isMini(IgniteInternalFuture<?> f) {
+         return f.getClass().equals(MiniFuture.class);
+     }
+ 
+     /**
+      * Completeness callback.
+      */
+     private void onComplete() {
+         if (super.onDone(tx, err.get()))
+             // Don't forget to clean up.
+             cctx.mvcc().removeFuture(this);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void prepare() {
+         // Obtain the topology version to use.
+         AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ 
+         if (topVer != null) {
+             tx.topologyVersion(topVer);
+ 
+             prepare0(false);
+ 
+             return;
+         }
+ 
+         prepareOnTopology(false, null);
+     }
+ 
+     /**
+      * @param remap Remap flag.
+      * @param c Optional closure to run after map.
+      */
+     private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
+         GridDhtTopologyFuture topFut = topologyReadLock();
+ 
+         try {
+             if (topFut == null) {
+                 assert isDone();
+ 
+                 return;
+             }
+ 
+             if (topFut.isDone()) {
+                 StringBuilder invalidCaches = new StringBuilder();
+ 
+                 boolean cacheInvalid = false;
+ 
+                 for (GridCacheContext ctx : cctx.cacheContexts()) {
+                     if (tx.activeCacheIds().contains(ctx.cacheId()) && !topFut.isCacheTopologyValid(ctx))
{
+                         if (cacheInvalid)
+                             invalidCaches.append(", ");
+ 
+                         invalidCaches.append(U.maskName(ctx.name()));
+ 
+                         cacheInvalid = true;
+                     }
+                 }
+ 
+                 if (cacheInvalid) {
+                     onDone(new IgniteCheckedException("Failed to perform cache operation
(cache topology is not valid): " +
+                         invalidCaches.toString()));
+ 
+                     return;
+                 }
+ 
+                 if (remap)
+                     tx.onRemap(topFut.topologyVersion());
+                 else
+                     tx.topologyVersion(topFut.topologyVersion());
+ 
+                 prepare0(remap);
+ 
+                 if (c != null)
+                     c.run();
+             }
+             else {
+                 topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>()
{
+                     @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
+                         cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable()
{
+                             @Override public void run() {
+                                 prepareOnTopology(remap, c);
+                             }
+                         });
+                     }
+                 });
+             }
+         }
+         finally {
+             topologyReadUnlock();
+         }
+     }
+ 
+     /**
+      * Acquires topology read lock.
+      *
+      * @return Topology ready future.
+      */
+     private GridDhtTopologyFuture topologyReadLock() {
+         if (tx.activeCacheIds().isEmpty())
+             return cctx.exchange().lastTopologyFuture();
+ 
+         GridCacheContext<?, ?> nonLocCtx = null;
+ 
+         for (int cacheId : tx.activeCacheIds()) {
+             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+ 
+             if (!cacheCtx.isLocal()) {
+                 nonLocCtx = cacheCtx;
+ 
+                 break;
+             }
+         }
+ 
+         if (nonLocCtx == null)
+             return cctx.exchange().lastTopologyFuture();
+ 
+         nonLocCtx.topology().readLock();
+ 
+         if (nonLocCtx.topology().stopping()) {
+             onDone(new IgniteCheckedException("Failed to perform cache operation (cache
is stopped): " +
+                 nonLocCtx.name()));
+ 
+             return null;
+         }
+ 
+         return nonLocCtx.topology().topologyVersionFuture();
+     }
+ 
+     /**
+      * Releases topology read lock.
+      */
+     private void topologyReadUnlock() {
+         if (!tx.activeCacheIds().isEmpty()) {
+             GridCacheContext<?, ?> nonLocCtx = null;
+ 
+             for (int cacheId : tx.activeCacheIds()) {
+                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+ 
+                 if (!cacheCtx.isLocal()) {
+                     nonLocCtx = cacheCtx;
+ 
+                     break;
+                 }
+             }
+ 
+             if (nonLocCtx != null)
+                 nonLocCtx.topology().readUnlock();
+         }
+     }
+ 
+     /**
+      * Initializes future.
+      *
+      * @param remap Remap flag.
+      */
+     private void prepare0(boolean remap) {
+         try {
+             boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+ 
+             if (!txStateCheck) {
+                 if (tx.setRollbackOnly()) {
+                     if (tx.timedOut())
+                         onError(null, null, new IgniteTxTimeoutCheckedException("Transaction
timed out and " +
+                             "was rolled back: " + this));
+                     else
+                         onError(null, null, new IgniteCheckedException("Invalid transaction
state for prepare " +
+                             "[state=" + tx.state() + ", tx=" + this + ']'));
+                 }
+                 else
+                     onError(null, null, new IgniteTxRollbackCheckedException("Invalid transaction
state for " +
+                         "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+ 
+                 return;
+             }
+ 
+             // Make sure to add future before calling prepare.
+             if (!remap)
+                 cctx.mvcc().addFuture(this);
+ 
+             prepare(
+                 tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
+                 tx.writeEntries());
+ 
+             markInitialized();
+         }
+         catch (TransactionTimeoutException | TransactionOptimisticException e) {
+             onError(cctx.localNodeId(), null, e);
+         }
+         catch (IgniteCheckedException e) {
+             onDone(e);
+         }
+     }
+ 
+     /**
+      * @param reads Read entries.
+      * @param writes Write entries.
 -     * @throws IgniteCheckedException If transaction is group-lock and some key was mapped
to to the local node.
+      */
+     private void prepare(
+         Iterable<IgniteTxEntry> reads,
+         Iterable<IgniteTxEntry> writes
+     ) throws IgniteCheckedException {
+         AffinityTopologyVersion topVer = tx.topologyVersion();
+ 
+         assert topVer.topologyVersion() > 0;
+ 
+         txMapping = new GridDhtTxMapping();
+ 
+         ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>();
+ 
+         if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+             for (int cacheId : tx.activeCacheIds()) {
+                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+ 
+                 if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+                     onDone(new ClusterTopologyCheckedException("Failed to map keys for cache
(all " +
+                         "partition nodes left the grid): " + cacheCtx.name()));
+ 
+                     return;
+                 }
+             }
+         }
+ 
+         // Assign keys to primary nodes.
+         GridDistributedTxMapping cur = null;
+ 
+         for (IgniteTxEntry read : reads) {
+             GridDistributedTxMapping updated = map(read, topVer, cur, false);
+ 
+             if (cur != updated) {
+                 mappings.offer(updated);
+ 
+                 if (updated.node().isLocal()) {
+                     if (read.context().isNear())
+                         tx.nearLocallyMapped(true);
+                     else if (read.context().isColocated())
+                         tx.colocatedLocallyMapped(true);
+                 }
+ 
+                 cur = updated;
+             }
+         }
+ 
+         for (IgniteTxEntry write : writes) {
+             GridDistributedTxMapping updated = map(write, topVer, cur, true);
+ 
+             if (cur != updated) {
+                 mappings.offer(updated);
+ 
+                 if (updated.node().isLocal()) {
+                     if (write.context().isNear())
+                         tx.nearLocallyMapped(true);
+                     else if (write.context().isColocated())
+                         tx.colocatedLocallyMapped(true);
+                 }
+ 
+                 cur = updated;
+             }
+         }
+ 
+         if (isDone()) {
+             if (log.isDebugEnabled())
+                 log.debug("Abandoning (re)map because future is done: " + this);
+ 
+             return;
+         }
+ 
+         tx.addEntryMapping(mappings);
+ 
+         cctx.mvcc().recheckPendingLocks();
+ 
+         txMapping.initLast(mappings);
+ 
+         tx.transactionNodes(txMapping.transactionNodes());
+ 
+         checkOnePhase();
+ 
+         proceedPrepare(mappings);
+     }
+ 
+     /**
+      * Continues prepare after previous mapping successfully finished.
+      *
+      * @param mappings Queue of mappings.
+      */
+     private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping>
mappings) {
+         if (isDone())
+             return;
+ 
+         final GridDistributedTxMapping m = mappings.poll();
+ 
+         if (m == null)
+             return;
+ 
+         assert !m.empty();
+ 
+         final ClusterNode n = m.node();
+ 
+         GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+             futId,
+             tx.topologyVersion(),
+             tx,
+             tx.optimistic() && tx.serializable() ? m.reads() : null,
+             m.writes(),
+             m.near(),
+             txMapping.transactionNodes(),
+             m.last(),
+             m.lastBackups(),
+             tx.onePhaseCommit(),
+             tx.needReturnValue() && tx.implicit(),
+             tx.implicitSingle(),
+             m.explicitLock(),
+             tx.subjectId(),
+             tx.taskNameHash(),
+             m.clientFirst());
+ 
+         for (IgniteTxEntry txEntry : m.writes()) {
+             if (txEntry.op() == TRANSFORM)
+                 req.addDhtVersion(txEntry.txKey(), null);
+         }
+ 
+         // Must lock near entries separately.
+         if (m.near()) {
+             try {
+                 tx.optimisticLockEntries(req.writes());
+ 
+                 tx.userPrepare();
+             }
+             catch (IgniteCheckedException e) {
+                 onError(null, null, e);
+             }
+         }
+ 
+         final MiniFuture fut = new MiniFuture(m, mappings);
+ 
+         req.miniId(fut.futureId());
+ 
+         add(fut); // Append new future.
+ 
+         // If this is the primary node for the keys.
+         if (n.isLocal()) {
+             // At this point, if any new node joined, then it is
+             // waiting for this transaction to complete, so
+             // partition reassignments are not possible here.
+             IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(),
tx, req);
+ 
+             prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>()
{
+                 @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse>
prepFut) {
+                     try {
+                         fut.onResult(n.id(), prepFut.get());
+                     }
+                     catch (IgniteCheckedException e) {
+                         fut.onResult(e);
+                     }
+                 }
+             });
+         }
+         else {
+             try {
+                 cctx.io().send(n, req, tx.ioPolicy());
+             }
+             catch (IgniteCheckedException e) {
+                 // Fail the whole thing.
+                 fut.onResult(e);
+             }
+         }
+     }
+ 
+     /**
+      * @param entry Transaction entry.
+      * @param topVer Topology version.
+      * @param cur Current mapping.
+      * @param waitLock Wait lock flag.
 -     * @throws IgniteCheckedException If transaction is group-lock and local node is not
primary for key.
+      * @return Mapping.
+      */
+     private GridDistributedTxMapping map(
+         IgniteTxEntry entry,
+         AffinityTopologyVersion topVer,
+         @Nullable GridDistributedTxMapping cur,
+         boolean waitLock
 -    ) throws IgniteCheckedException {
++    ) {
+         GridCacheContext cacheCtx = entry.context();
+ 
+         List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+ 
+         txMapping.addMapping(nodes);
+ 
+         ClusterNode primary = F.first(nodes);
+ 
+         assert primary != null;
+ 
+         if (log.isDebugEnabled()) {
+             log.debug("Mapped key to primary node [key=" + entry.key() +
+                 ", part=" + cacheCtx.affinity().partition(entry.key()) +
+                 ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+         }
+ 
+         // Must re-initialize cached entry while holding topology lock.
+         if (cacheCtx.isNear())
+             entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+         else if (!cacheCtx.isLocal())
+             entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+         else
+             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+ 
+         if (cacheCtx.isNear() || cacheCtx.isLocal()) {
+             if (waitLock && entry.explicitVersion() == null)
+                 lockKeys.add(entry.txKey());
+         }
+ 
+         if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear())
{
+             boolean clientFirst = cur == null && cctx.kernalContext().clientNode();
+ 
+             cur = new GridDistributedTxMapping(primary);
+ 
+             // Initialize near flag right away.
+             cur.near(cacheCtx.isNear());
+ 
+             cur.clientFirst(clientFirst);
+         }
+ 
+         cur.add(entry);
+ 
+         if (entry.explicitVersion() != null) {
+             tx.markExplicit(primary.id());
+ 
+             cur.markExplicitLock();
+         }
+ 
+         entry.nodeId(primary.id());
+ 
+         if (cacheCtx.isNear()) {
+             while (true) {
+                 try {
+                     GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+ 
+                     cached.dhtNodeId(tx.xidVersion(), primary.id());
+ 
+                     break;
+                 }
+                 catch (GridCacheEntryRemovedException ignore) {
+                     entry.cached(cacheCtx.near().entryEx(entry.key()));
+                 }
+             }
+         }
+ 
+         return cur;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>,
String>() {
+             @Override public String apply(IgniteInternalFuture<?> f) {
+                 return "[node=" + ((MiniFuture)f).node().id() +
+                     ", loc=" + ((MiniFuture)f).node().isLocal() +
+                     ", done=" + f.isDone() + "]";
+             }
+         });
+ 
+         return S.toString(GridNearOptimisticTxPrepareFuture.class, this,
+             "innerFuts", futs,
+             "tx", tx,
+             "super", super.toString());
+     }
+ 
+     /**
+      *
+      */
+     private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private final IgniteUuid futId = IgniteUuid.randomUuid();
+ 
+         /** Keys. */
+         @GridToStringInclude
+         private GridDistributedTxMapping m;
+ 
+         /** Flag to signal some result being processed. */
+         private AtomicBoolean rcvRes = new AtomicBoolean(false);
+ 
+         /** Mappings to proceed prepare. */
+         private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings;
+ 
+         /**
+          * @param m Mapping.
+          * @param mappings Queue of mappings to proceed with.
+          */
+         MiniFuture(
+             GridDistributedTxMapping m,
+             ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings
+         ) {
+             this.m = m;
+             this.mappings = mappings;
+         }
+ 
+         /**
+          * @return Future ID.
+          */
+         IgniteUuid futureId() {
+             return futId;
+         }
+ 
+         /**
+          * @return Node ID.
+          */
+         public ClusterNode node() {
+             return m.node();
+         }
+ 
+         /**
+          * @return Keys.
+          */
+         public GridDistributedTxMapping mapping() {
+             return m;
+         }
+ 
+         /**
+          * @param e Error.
+          */
+         void onResult(Throwable e) {
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (log.isDebugEnabled())
+                     log.debug("Failed to get future result [fut=" + this + ", err=" + e
+ ']');
+ 
+                 // Fail.
+                 onDone(e);
+             }
+             else
+                 U.warn(log, "Received error after another result has been processed [fut="
+
+                     GridNearOptimisticTxPrepareFuture.this + ", mini=" + this + ']', e);
+         }
+ 
+         /**
+          * @param e Node failure.
+          */
+         void onResult(ClusterTopologyCheckedException e) {
+             if (isDone())
+                 return;
+ 
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (log.isDebugEnabled())
+                     log.debug("Remote node left grid while sending or waiting for reply
(will not retry): " + this);
+ 
+                 // Fail the whole future (make sure not to remap on different primary node
+                 // to prevent multiple lock coordinators).
+                 onError(null, null, e);
+             }
+         }
+ 
+         /**
+          * @param nodeId Failed node ID.
+          * @param res Result callback.
+          */
+         void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+             if (isDone())
+                 return;
+ 
+             if (rcvRes.compareAndSet(false, true)) {
+                 if (res.error() != null) {
+                     // Fail the whole compound future.
+                     onError(nodeId, mappings, res.error());
+                 }
+                 else {
+                     if (res.clientRemapVersion() != null) {
+                         assert cctx.kernalContext().clientNode();
+                         assert m.clientFirst();
+ 
+                         IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+ 
+                         if (affFut != null && !affFut.isDone()) {
+                             affFut.listen(new CI1<IgniteInternalFuture<?>>()
{
+                                 @Override public void apply(IgniteInternalFuture<?>
fut) {
+                                     remap();
+                                 }
+                             });
+                         }
+                         else
+                             remap();
+                     }
+                     else {
+                         onPrepareResponse(m, res);
+ 
+                         // Proceed prepare before finishing mini future.
+                         if (mappings != null)
+                             proceedPrepare(mappings);
+ 
+                         // Finish this mini future.
+                         onDone(tx);
+                     }
+                 }
+             }
+         }
+ 
+         /**
+          *
+          */
+         private void remap() {
+             prepareOnTopology(true, new Runnable() {
+                 @Override public void run() {
+                     onDone(tx);
+                 }
+             });
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(),
"err", error());
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b411b99,cb391e4..c40ac5e
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -83,8 -82,8 +82,11 @@@ public class GridNearTxLocal extends Gr
      private Map<IgniteTxKey, IgniteCacheExpiryPolicy> accessMap;
  
      /** */
 +    private boolean needCheckBackup;
 +
++    /** */
+     private boolean hasRemoteLocks;
+ 
      /**
       * Empty constructor required for {@link Externalizable}.
       */
@@@ -131,9 -130,8 +133,9 @@@
              concurrency,
              isolation,
              timeout,
-             invalidate,
+             false,
              storeEnabled,
 +            false,
              txSize,
              subjId,
              taskNameHash);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index ded5409,b418500..7b5cbf0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@@ -336,7 -393,13 +370,7 @@@ public class GridNearTxPrepareResponse 
  
                  writer.incrementState();
  
-             case 18:
+             case 16:
 -                if (!writer.writeCollection("pending", pending, MessageCollectionItemType.MSG))
 -                    return false;
 -
 -                writer.incrementState();
 -
 -            case 17:
                  if (!writer.writeMessage("retVal", retVal))
                      return false;
  
@@@ -414,7 -491,15 +462,7 @@@
  
                  reader.incrementState();
  
-             case 18:
 -            case 16:
 -                pending = reader.readCollection("pending", MessageCollectionItemType.MSG);
 -
 -                if (!reader.isLastRead())
 -                    return false;
 -
 -                reader.incrementState();
 -
+             case 17:
                  retVal = reader.readMessage("retVal");
  
                  if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index c3a5b1d,4ac81f8..d5d0205
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@@ -210,6 -178,11 +206,12 @@@ public class GridNearTxRemote extends G
          return false; // Serializable will be enforced on primary mode.
      }
  
+     /** {@inheritDoc} */
 -    @Override public GridCacheVersion ownedVersion(IgniteTxKey key) {
++    public GridCacheVersion ownedVersion(IgniteTxKey key) {
++        // TODO ignite-264 do we need this method?
+         return owned == null ? null : owned.get(key);
+     }
+ 
      /**
       * @return Near transaction ID.
       */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 1f175f6,e481e25..6b4ef82
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -195,19 -195,16 +195,17 @@@ public class IgniteTxHandler 
                      return fut;
                  }
              },
-             new C2<IgniteInternalTx, Exception, IgniteInternalTx>() {
-                 @Nullable @Override public IgniteInternalTx apply(IgniteInternalTx tx, Exception
e) {
+             new C2<GridNearTxPrepareResponse, Exception, GridNearTxPrepareResponse>()
{
+                 @Nullable @Override public GridNearTxPrepareResponse apply(GridNearTxPrepareResponse
res, Exception e) {
                      if (e != null) {
-                         // tx can be null of exception occurred.
-                         if (tx != null)
-                             tx.setRollbackOnly(); // Just in case.
+                         locTx.setRollbackOnly(); // Just in case.
  
 -                        if (!(e instanceof IgniteTxOptimisticCheckedException))
 -                            U.error(log, "Failed to prepare transaction: " + locTx, e);
 +                        if (!X.hasCause(e, IgniteTxOptimisticCheckedException.class) &&
 +                            !X.hasCause(e, IgniteFutureCancelledException.class))
-                             U.error(log, "Failed to prepare DHT transaction: " + tx, e);
++                            U.error(log, "Failed to prepare DHT transaction: " + locTx,
e);
                      }
  
-                     return tx;
+                     return res;
                  }
              }
          );
@@@ -755,15 -887,36 +857,38 @@@
          if (nearTx != null)
              finish(nodeId, nearTx, req);
  
-         if (dhtTx != null && !dhtTx.done()) {
-             dhtTx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>()
{
-                 @Override public void apply(IgniteInternalFuture<IgniteInternalTx>
igniteTxIgniteFuture) {
-                     sendReply(nodeId, req, true);
-                 }
-             });
+         if (req.replyRequired()) {
+             IgniteInternalFuture completeFut;
+ 
+             IgniteInternalFuture<IgniteInternalTx> dhtFin = dhtTx == null ? null :
dhtTx.done() ? null : dhtTx.finishFuture();
+             IgniteInternalFuture<IgniteInternalTx> nearFin = nearTx == null ? null
: nearTx.done() ? null : nearTx.finishFuture();
+ 
+             if (dhtFin != null && nearFin != null) {
+                 GridCompoundFuture fut = new GridCompoundFuture();
+ 
+                 fut.add(dhtFin);
+                 fut.add(nearFin);
+ 
+                 fut.markInitialized();
+ 
+                 completeFut = fut;
+             }
+             else
+                 completeFut = dhtFin != null ? dhtFin : nearFin;
+ 
+             if (completeFut != null) {
+                 completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>()
{
+                     @Override
+                     public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture)
{
+                         sendReply(nodeId, req);
+                     }
+                 });
+             }
+             else
+                 sendReply(nodeId, req);
          }
 +        else
-             sendReply(nodeId, req, true);
++            sendReply(nodeId, req);
      }
  
      /**
@@@ -870,30 -1030,23 +1001,33 @@@
       * @param nodeId Node id that originated finish request.
       * @param req Request.
       */
 -    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req) {
 -        GridCacheMessage res = new GridDhtTxFinishResponse(req.version(), req.futureId(),
req.miniId());
 +    protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed)
{
 +        if (req.replyRequired()) {
 +            GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(),
req.miniId());
  
 -        try {
 -            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
 -        }
 -        catch (Throwable e) {
 -            // Double-check.
 -            if (ctx.discovery().node(nodeId) == null) {
 -                if (log.isDebugEnabled())
 -                    log.debug("Node left while sending finish response [nodeId=" + nodeId
+ ", res=" + res + ']');
 +            if (req.checkCommitted()) {
 +                res.checkCommitted(true);
 +
 +                if (!committed)
 +                    res.checkCommittedError(new IgniteTxRollbackCheckedException("Failed
to commit transaction " +
 +                        "(transaction has been rolled back on backup node): " + req.version()));
              }
 -            else
 -                U.error(log, "Failed to send finish response to node [nodeId=" + nodeId
+ ", res=" + res + ']', e);
  
 -            if (e instanceof Error)
 -                throw (Error)e;
 +            try {
 +                ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
 +            }
 +            catch (Throwable e) {
 +                // Double-check.
 +                if (ctx.discovery().node(nodeId) == null) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Node left while sending finish response [nodeId=" + nodeId
+ ", res=" + res + ']');
 +                }
 +                else
 +                    U.error(log, "Failed to send finish response to node [nodeId=" + nodeId
+ ", res=" + res + ']', e);
++
++                if (e instanceof Error)
++                    throw (Error)e;
 +            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 55952c8,0a61b1a..9e4473b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -1237,8 -1320,10 +1289,9 @@@ public abstract class IgniteTxLocalAdap
                                  true,
                                  -1L,
                                  -1L,
-                                 null);
+                                 null,
+                                 skipStore);
  
 -
                              // As optimization, mark as checked immediately
                              // for non-pessimistic if value is not null.
                              if (val != null && !pessimistic())

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 5c12912,630330e..f3bca17
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@@ -359,12 -367,10 +366,11 @@@ public class IgniteTxManager extends Gr
          TransactionConcurrency concurrency,
          TransactionIsolation isolation,
          long timeout,
-         boolean invalidate,
          boolean storeEnabled,
 -        int txSize) {
 +        int txSize
 +    ) {
-         assert sysCacheCtx == null || sysCacheCtx.system();
-         
+         assert sysCacheCtx == null || sysCacheCtx.systemTx();
+ 
          UUID subjId = null; // TODO GG-9141 how to get subj ID?
  
          int taskNameHash = cctx.kernalContext().job().currentTaskNameHash();
@@@ -1099,9 -1222,15 +1126,6 @@@
          ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
  
          if (txIdMap.remove(tx.xidVersion(), tx)) {
-             // 3.1 Call dataStructures manager.
-             cctx.kernalContext().dataStructures().onTxCommitted(tx);
 -            // 2. Must process completed entries before unlocking!
 -            processCompletedEntries(tx);
 -
 -            if (tx instanceof GridDhtTxLocal) {
 -                GridDhtTxLocal dhtTxLoc = (GridDhtTxLocal)tx;
 -
 -                collectPendingVersions(dhtTxLoc);
 -            }
--
              // 4. Unlock write resources.
              unlockMultiple(tx, tx.writeEntries());
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMvccSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/c31ad7e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------


Mime
View raw message