ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Fri, 02 Oct 2015 14:28:56 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 [created] 3ed37b267


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3ed37b26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3ed37b26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3ed37b26

Branch: refs/heads/ignite-1607
Commit: 3ed37b26715fa894079f657c1171a68613cb629f
Parents: fd091c8
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Oct 2 17:28:46 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 2 17:28:46 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheEntry.java      |   4 +-
 ...arOptimisticSerializableTxPrepareFuture.java | 849 +++++++++++++++++++
 .../cache/distributed/near/GridNearTxLocal.java |  18 +-
 .../cache/transactions/IgniteTxManager.java     |  16 +-
 .../transactions/TransactionIsolation.java      |   8 +-
 .../cache/CacheDeadlockFreeTxTest.java          | 484 +++++++++++
 .../junits/common/GridCommonAbstractTest.java   |  19 +-
 7 files changed, 1385 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index be2f3d3..bf22c7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -235,12 +235,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
 
             val = this.val;
 
-            if (mvcc != null && mvcc.isEmpty())
+            if (mvcc.isEmpty())
                 mvccExtras(null);
         }
 
         // Don't link reentries.
-        if (cand != null && !cand.reentry())
+        if (!cand.reentry())
             // Link with other candidates in the same thread.
             cctx.mvcc().addNext(cctx, cand);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
new file mode 100644
index 0000000..9056ae9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+
+/**
+ *
+ */
+public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter
+    implements GridCacheMvccFuture<IgniteInternalTx> {
+    /** */
+    @GridToStringInclude
+    private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     */
+    public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+        super(cctx, tx);
+
+        assert tx.optimistic() : tx;
+
+        // Should wait for all mini futures completion before finishing tx.
+        ignoreChildFailures(IgniteCheckedException.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+        if (log.isDebugEnabled())
+            log.debug("Transaction future received owner changed callback: " + entry);
+
+        if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) {
+            lockKeys.remove(entry.txKey());
+
+            // This will check for locks.
+            onDone();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+                if (isMini(f))
+                    return ((MiniFuture)f).node();
+
+                return cctx.discovery().localNode();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        boolean found = false;
+
+        for (IgniteInternalFuture<?> fut : futures()) {
+            if (isMini(fut)) {
+                MiniFuture f = (MiniFuture) fut;
+
+                if (f.node().id().equals(nodeId)) {
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                        nodeId);
+
+                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                    f.onResult(e);
+
+                    found = true;
+                }
+            }
+        }
+
+        return found;
+    }
+
+    /**
+     * @param nodeId Failed node ID.
+     * @param e Error.
+     */
+    void onError(@Nullable UUID nodeId, Throwable e) {
+        if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
+            if (tx.onePhaseCommit()) {
+                tx.markForBackupCheck();
+
+                onComplete();
+
+                return;
+            }
+        }
+
+        if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null)
+            tx.onOptimisticException(nodeId);
+
+        if (err.compareAndSet(null, e))
+            tx.setRollbackOnly();
+    }
+
+    /**
+     * @return {@code True} if all locks are owned.
+     */
+    private boolean checkLocks() {
+        boolean locked = lockKeys.isEmpty();
+
+        if (locked) {
+            if (log.isDebugEnabled())
+                log.debug("All locks are acquired for near prepare future: " + this);
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+        }
+
+        return locked;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+        if (!isDone()) {
+            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.node().id().equals(nodeId);
+
+                        f.onResult(nodeId, res);
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+        this.err.compareAndSet(null, err);
+
+        err = this.err.get();
+
+        // If locks were not acquired yet, delay completion.
+        if (isDone() || (err == null && !checkLocks()))
+            return false;
+
+        return onComplete();
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    private boolean isMini(IgniteInternalFuture<?> f) {
+        return f.getClass().equals(MiniFuture.class);
+    }
+
+    /**
+     * Completeness callback.
+     *
+     * @return {@code True} if future was finished by this call.
+     */
+    private boolean onComplete() {
+        Throwable err0 = err.get();
+
+        if (err0 == null || tx.needCheckBackup())
+            tx.state(PREPARED);
+
+        if (super.onDone(tx, err0)) {
+            // Don't forget to clean up.
+            cctx.mvcc().removeFuture(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepare() {
+        // Obtain the topology version to use.
+        AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
+        if (topVer != null) {
+            tx.topologyVersion(topVer);
+
+            cctx.mvcc().addFuture(this);
+
+            prepare0(false);
+
+            return;
+        }
+
+        prepareOnTopology(false, null);
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
+     */
+    private void prepareOnTopology(final boolean remap, @Nullable final Runnable c) {
+        GridDhtTopologyFuture topFut = topologyReadLock();
+
+        AffinityTopologyVersion topVer = null;
+
+        try {
+            if (topFut == null) {
+                assert isDone();
+
+                return;
+            }
+
+            if (topFut.isDone()) {
+                topVer = topFut.topologyVersion();
+
+                if (remap)
+                    tx.onRemap(topVer);
+                else
+                    tx.topologyVersion(topVer);
+
+                if (!remap)
+                    cctx.mvcc().addFuture(this);
+            }
+        }
+        finally {
+            topologyReadUnlock();
+        }
+
+        if (topVer != null) {
+            StringBuilder invalidCaches = null;
+
+            for (Integer cacheId : tx.activeCacheIds()) {
+                GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+                assert ctx != null : cacheId;
+
+                Throwable err = topFut.validateCache(ctx);
+
+                if (err != null) {
+                    if (invalidCaches != null)
+                        invalidCaches.append(", ");
+                    else
+                        invalidCaches = new StringBuilder();
+
+                    invalidCaches.append(U.maskName(ctx.name()));
+                }
+            }
+
+            if (invalidCaches != null) {
+                onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+                    invalidCaches.toString()));
+
+                return;
+            }
+
+            prepare0(remap);
+
+            if (c != null)
+                c.run();
+        }
+        else {
+            topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override
+                public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                fut.get();
+
+                                prepareOnTopology(remap, c);
+                            } catch (IgniteCheckedException e) {
+                                onDone(e);
+                            } finally {
+                                cctx.txContextReset();
+                            }
+                        }
+                    });
+                }
+            });
+        }
+    }
+
+    /**
+     * Acquires topology read lock.
+     *
+     * @return Topology ready future.
+     */
+    private GridDhtTopologyFuture topologyReadLock() {
+        if (tx.activeCacheIds().isEmpty())
+            return cctx.exchange().lastTopologyFuture();
+
+        GridCacheContext<?, ?> nonLocCtx = null;
+
+        for (int cacheId : tx.activeCacheIds()) {
+            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+            if (!cacheCtx.isLocal()) {
+                nonLocCtx = cacheCtx;
+
+                break;
+            }
+        }
+
+        if (nonLocCtx == null)
+            return cctx.exchange().lastTopologyFuture();
+
+        nonLocCtx.topology().readLock();
+
+        if (nonLocCtx.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+                nonLocCtx.name()));
+
+            return null;
+        }
+
+        return nonLocCtx.topology().topologyVersionFuture();
+    }
+
+    /**
+     * Releases topology read lock.
+     */
+    private void topologyReadUnlock() {
+        if (!tx.activeCacheIds().isEmpty()) {
+            GridCacheContext<?, ?> nonLocCtx = null;
+
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (!cacheCtx.isLocal()) {
+                    nonLocCtx = cacheCtx;
+
+                    break;
+                }
+            }
+
+            if (nonLocCtx != null)
+                nonLocCtx.topology().readUnlock();
+        }
+    }
+
+    /**
+     * Initializes future.
+     *
+     * @param remap Remap flag.
+     */
+    private void prepare0(boolean remap) {
+        try {
+            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+            if (!txStateCheck) {
+                if (tx.setRollbackOnly()) {
+                    if (tx.timedOut())
+                        onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                            "was rolled back: " + this));
+                    else
+                        onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
+                            "[state=" + tx.state() + ", tx=" + this + ']'));
+                }
+                else
+                    onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+                return;
+            }
+
+            prepare(
+                tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(),
+                tx.writeEntries());
+
+            markInitialized();
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+        }
+    }
+
+    /**
+     * @param reads Read entries.
+     * @param writes Write entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void prepare(
+        Iterable<IgniteTxEntry> reads,
+        Iterable<IgniteTxEntry> writes
+    ) throws IgniteCheckedException {
+        AffinityTopologyVersion topVer = tx.topologyVersion();
+
+        assert topVer.topologyVersion() > 0;
+
+        txMapping = new GridDhtTxMapping();
+
+        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
+                        "partition nodes left the grid): " + cacheCtx.name()));
+
+                    return;
+                }
+            }
+        }
+
+        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+
+        for (IgniteTxEntry read : reads)
+            map(read, topVer, mappings, false);
+
+        for (IgniteTxEntry write : writes)
+            map(write, topVer, mappings, true);
+
+        if (isDone()) {
+            if (log.isDebugEnabled())
+                log.debug("Abandoning (re)map because future is done: " + this);
+
+            return;
+        }
+
+        cctx.mvcc().recheckPendingLocks();
+
+        tx.addEntryMapping(mappings.values());
+
+        tx.transactionNodes(txMapping.transactionNodes());
+
+        checkOnePhase();
+
+        for (GridDistributedTxMapping m : mappings.values()) {
+            if (!prepare(m))
+                break;
+        }
+
+        markInitialized();
+    }
+
+    /**
+     * @param m Mapping.
+     * @return {@code False} if should stop mapping.
+     */
+    private boolean prepare(GridDistributedTxMapping m) {
+        assert !m.empty();
+
+        final ClusterNode n = m.node();
+
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            tx.optimistic() && tx.serializable() ? m.reads() : null,
+            m.writes(),
+            m.near(),
+            txMapping.transactionNodes(),
+            m.last(),
+            m.lastBackups(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            m.clientFirst());
+
+        for (IgniteTxEntry txEntry : m.writes()) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        // Must lock near entries separately.
+        if (m.near()) {
+            try {
+                tx.optimisticLockEntries(req.writes());
+
+                tx.userPrepare();
+            }
+            catch (IgniteCheckedException e) {
+                onError(m.node().id(), e);
+
+                return false;
+            }
+        }
+
+        final MiniFuture fut = new MiniFuture(m);
+
+        req.miniId(fut.futureId());
+
+        add(fut); // Append new future.
+
+        // If this is the primary node for the keys.
+        if (n.isLocal()) {
+            IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+            prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+                @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                    try {
+                        fut.onResult(n.id(), prepFut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onResult(e);
+                    }
+                }
+            });
+        }
+        else {
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
+            }
+            catch (ClusterTopologyCheckedException e) {
+                e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                fut.onResult(e);
+
+                return false;
+            }
+            catch (IgniteCheckedException e) {
+                fut.onResult(e);
+
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param entry Transaction entry.
+     * @param topVer Topology version.
+     * @param curMapping Current mapping.
+     * @param waitLock Wait lock flag.
+     */
+    private void map(
+        IgniteTxEntry entry,
+        AffinityTopologyVersion topVer,
+        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+        boolean waitLock
+    ) {
+        GridCacheContext cacheCtx = entry.context();
+
+        List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+
+        txMapping.addMapping(nodes);
+
+        ClusterNode primary = F.first(nodes);
+
+        assert primary != null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Mapped key to primary node [key=" + entry.key() +
+                ", part=" + cacheCtx.affinity().partition(entry.key()) +
+                ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+        }
+
+        // Must re-initialize cached entry while holding topology lock.
+        if (cacheCtx.isNear())
+            entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+        else if (!cacheCtx.isLocal())
+            entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+        else
+            entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+
+        if (cacheCtx.isNear() || cacheCtx.isLocal()) {
+            if (waitLock && entry.explicitVersion() == null)
+                lockKeys.add(entry.txKey());
+        }
+
+        IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
+
+        GridDistributedTxMapping cur = curMapping.get(key);
+
+        if (cur == null) {
+            cur = new GridDistributedTxMapping(primary);
+
+            curMapping.put(key, cur);
+
+            if (primary.isLocal()) {
+                if (entry.context().isNear())
+                    tx.nearLocallyMapped(true);
+                else if (entry.context().isColocated())
+                    tx.colocatedLocallyMapped(true);
+            }
+
+            // Initialize near flag right away.
+            cur.near(cacheCtx.isNear());
+
+            cur.clientFirst(cctx.kernalContext().clientNode());
+
+            cur.last(true);
+        }
+
+        cur.add(entry);
+
+        if (entry.explicitVersion() != null) {
+            tx.markExplicit(primary.id());
+
+            cur.markExplicitLock();
+        }
+
+        entry.nodeId(primary.id());
+
+        if (cacheCtx.isNear()) {
+            while (true) {
+                try {
+                    GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+
+                    cached.dhtNodeId(tx.xidVersion(), primary.id());
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    entry.cached(cacheCtx.near().entryEx(entry.key()));
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+            @Override public String apply(IgniteInternalFuture<?> f) {
+                return "[node=" + ((MiniFuture)f).node().id() +
+                    ", loc=" + ((MiniFuture)f).node().isLocal() +
+                    ", done=" + f.isDone() + "]";
+            }
+        });
+
+        return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this,
+            "innerFuts", futs,
+            "tx", tx,
+            "super", super.toString());
+    }
+
+    /**
+     *
+     */
+    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** Keys. */
+        @GridToStringInclude
+        private GridDistributedTxMapping m;
+
+        /** Flag to signal some result being processed. */
+        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+
+        /**
+         * @param m Mapping.
+         */
+        MiniFuture(GridDistributedTxMapping m) {
+            this.m = m;
+        }
+
+        /**
+         * @return Future ID.
+         */
+        IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public ClusterNode node() {
+            return m.node();
+        }
+
+        /**
+         * @return Keys.
+         */
+        public GridDistributedTxMapping mapping() {
+            return m;
+        }
+
+        /**
+         * @param e Error.
+         */
+        void onResult(Throwable e) {
+            if (rcvRes.compareAndSet(false, true)) {
+                err.compareAndSet(null, e);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+                // Fail.
+                onDone(e);
+            }
+            else
+                U.warn(log, "Received error after another result has been processed [fut=" +
+                    GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e);
+        }
+
+        /**
+         * @param e Node failure.
+         */
+        void onResult(ClusterTopologyCheckedException e) {
+            if (isDone())
+                return;
+
+            if (rcvRes.compareAndSet(false, true)) {
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
+
+                onError(null, e);
+
+                onDone(e);
+            }
+        }
+
+        /**
+         * @param nodeId Failed node ID.
+         * @param res Result callback.
+         */
+        void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+            if (isDone())
+                return;
+
+            if (rcvRes.compareAndSet(false, true)) {
+                if (res.error() != null) {
+                    // Fail the whole compound future.
+                    onError(nodeId, res.error());
+
+                    onDone(res.error());
+                }
+                else {
+                    if (res.clientRemapVersion() != null) {
+                        assert cctx.kernalContext().clientNode();
+                        assert m.clientFirst();
+
+                        IgniteInternalFuture<?> affFut = cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                        if (affFut != null && !affFut.isDone()) {
+                            affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    try {
+                                        fut.get();
+
+                                        remap();
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        onDone(e);
+                                    }
+                                }
+                            });
+                        }
+                        else
+                            remap();
+                    }
+                    else {
+                        onPrepareResponse(m, res);
+
+                        // Finish this mini future.
+                        onDone(tx);
+                    }
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        private void remap() {
+            prepareOnTopology(true, new Runnable() {
+                @Override public void run() {
+                    onDone(tx);
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ea96649..ef9f77e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -557,6 +557,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
 
     /**
+     * TODO IGNITE-1607: remove this method?
+     *
      * Removes mapping in case of optimistic tx failure on primary node.
      *
      * @param failedNodeId Failed node ID.
@@ -586,6 +588,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /**
+     * @param nodeId Primary node id.
+     */
+    void onOptimisticException(UUID nodeId) {
+        mappings.remove(nodeId);
+    }
+
+    /**
      * @param nodeId Node ID to mark with explicit lock.
      * @return {@code True} if mapping was found.
      */
@@ -743,8 +752,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
 
         if (fut == null) {
             // Future must be created before any exception can be thrown.
-            fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, this) :
-                new GridNearPessimisticTxPrepareFuture(cctx, this);
+            if (optimistic()) {
+                fut = isolation() == TransactionIsolation.SERIALIZABLE_TRY_LOCK ?
+                    new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) :
+                    new GridNearOptimisticTxPrepareFuture(cctx, this);
+            }
+            else
+                fut = new GridNearPessimisticTxPrepareFuture(cctx, this);
 
             if (!prepFut.compareAndSet(null, fut))
                 return prepFut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4074eee..d431cb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1628,12 +1628,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         throws IgniteCheckedException {
         assert tx.optimistic() || !tx.local();
 
-        long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout());
+        long timeout;
 
-        // For serializable transactions, failure to acquire lock means
-        // that there is a serializable conflict. For all other isolation levels,
-        // we wait for the lock.
-        long timeout = tx.timeout() == 0 ? 0 : remainingTime;
+        if (tx.isolation() != TransactionIsolation.SERIALIZABLE_TRY_LOCK) {
+            long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout());
+
+            // For serializable transactions, failure to acquire lock means
+            // that there is a serializable conflict. For all other isolation levels,
+            // we wait for the lock.
+            timeout = tx.timeout() == 0 ? 0 : remainingTime;
+        }
+        else
+            timeout = -1L;
 
         for (IgniteTxEntry txEntry1 : entries) {
             // Check if this entry was prepared before.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
index d7671f0..c43396c 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java
@@ -31,7 +31,10 @@ public enum TransactionIsolation {
     REPEATABLE_READ,
 
     /** Serializable isolation level. */
-    SERIALIZABLE;
+    SERIALIZABLE,
+
+    /** TODO IGNITE-1607 */
+    SERIALIZABLE_TRY_LOCK;
 
     /** Enum values. */
     private static final TransactionIsolation[] VALS = values();
@@ -42,8 +45,7 @@ public enum TransactionIsolation {
      * @param ord Ordinal value.
      * @return Enumerated value or {@code null} if ordinal out of range.
      */
-    @Nullable
-    public static TransactionIsolation fromOrdinal(int ord) {
+    @Nullable public static TransactionIsolation fromOrdinal(int ord) {
         return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java
new file mode 100644
index 0000000..f156978
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionOptimisticException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE_TRY_LOCK;
+
+/**
+ *
+ */
+public class CacheDeadlockFreeTxTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 3;
+
+    /** */
+    private static final int CLIENTS = 3;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked1() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        final IgniteCache<Integer, Integer> cache =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
+
+        final Integer key = nearKey(cache);
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, 1);
+
+                    log.info("Locked key: " + key);
+
+                    latch1.countDown();
+
+                    assertTrue(latch2.await(10, SECONDS));
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "lock-thread");
+
+        assertTrue(latch1.await(10, SECONDS));
+
+        try {
+            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) {
+                cache.put(key, 2);
+
+                log.info("Commit");
+
+                tx.commit();
+            }
+
+            fail();
+        }
+        catch (TransactionOptimisticException e) {
+            log.info("Expected exception: " + e);
+        }
+
+        latch2.countDown();
+
+        fut.get();
+
+        assertEquals(1, (Object)cache.get(key));
+
+        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) {
+            cache.put(key, 2);
+
+            tx.commit();
+        }
+
+        assertEquals(2, (Object)cache.get(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked2() throws Exception {
+        rollbackIfLockedPartialLock(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxRollbackIfLocked3() throws Exception {
+        rollbackIfLockedPartialLock(true);
+    }
+
+    /**
+     * @param locKey If {@code true} gets lock for local key.
+     * @throws Exception If failed.
+     */
+    public void rollbackIfLockedPartialLock(boolean locKey) throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        final IgniteTransactions txs = ignite0.transactions();
+
+        final IgniteCache<Integer, Integer> cache =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true));
+
+        final Integer key1 = primaryKey(ignite(1).cache(cache.getName()));
+        final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName()));
+
+        final CountDownLatch latch1 = new CountDownLatch(1);
+        final CountDownLatch latch2 = new CountDownLatch(1);
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key1, 1);
+
+                    log.info("Locked key: " + key1);
+
+                    latch1.countDown();
+
+                    assertTrue(latch2.await(10, SECONDS));
+
+                    log.info("Commit1");
+
+                    tx.commit();
+                }
+
+                return null;
+            }
+        }, "lock-thread");
+
+        assertTrue(latch1.await(10, SECONDS));
+
+        try {
+            try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) {
+                cache.put(key1, 2);
+                cache.put(key2, 2);
+
+                log.info("Commit2");
+
+                tx.commit();
+            }
+
+            fail();
+        }
+        catch (TransactionOptimisticException e) {
+            log.info("Expected exception: " + e);
+        }
+
+        latch2.countDown();
+
+        fut.get();
+
+        assertEquals(1, (Object) cache.get(key1));
+        assertNull(cache.get(key2));
+
+        try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) {
+            cache.put(key1, 2);
+            cache.put(key2, 2);
+
+            log.info("Commit3");
+
+            tx.commit();
+        }
+
+        assertEquals(2, (Object) cache.get(key2));
+        assertEquals(2, (Object) cache.get(key2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlock() throws Exception {
+        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception {
+        concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockClients() throws Exception {
+        concurrentUpdateNoDeadlock(clients(), 20, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception {
+        concurrentUpdateNoDeadlock(clients(), 20, true);
+    }
+
+    /**
+     * @return Client nodes.
+     */
+    private List<Ignite> clients() {
+        List<Ignite> clients = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS; i++) {
+            Ignite ignite = ignite(SRVS + i);
+
+            assertTrue(ignite.configuration().isClientMode());
+
+            clients.add(ignite);
+        }
+
+        return clients;
+    }
+
+    /**
+     * @param updateNodes Nodes executing updates.
+     * @param threads Number of threads executing updates.
+     * @param restart If {@code true} restarts one node.
+     * @throws Exception If failed.
+     */
+    private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes,
+        int threads,
+        final boolean restart) throws Exception {
+        assert updateNodes.size() > 0;
+
+        final Ignite ignite0 = ignite(0);
+
+        final String cacheName =
+            ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName();
+
+        final int KEYS = 100;
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteInternalFuture<Object> fut = null;
+
+        try {
+            if (restart) {
+                fut = GridTestUtils.runAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        while (!finished.get()) {
+                            stopGrid(0);
+
+                            U.sleep(300);
+
+                            Ignite ignite = startGrid(0);
+
+                            assertFalse(ignite.configuration().isClientMode());
+                        }
+
+                        return null;
+                    }
+                });
+            }
+
+            for (int i = 0; i < 10; i++) {
+                log.info("Iteration: " + i);
+
+                final long stopTime = U.currentTimeMillis() + 5_000;
+
+                final AtomicInteger idx = new AtomicInteger();
+
+                IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        int nodeIdx = idx.getAndIncrement() % updateNodes.size();
+
+                        Ignite node = updateNodes.get(nodeIdx);
+
+                        log.info("Tx thread: " + node.name());
+
+                        final IgniteTransactions txs = node.transactions();
+
+                        final IgniteCache<Integer, Integer> cache = node.cache(cacheName);
+
+                        assertNotNull(cache);
+
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                        while (U.currentTimeMillis() < stopTime) {
+                            final Map<Integer, Integer> keys = new LinkedHashMap<>();
+
+                            for (int i = 0; i < KEYS / 2; i++)
+                                keys.put(rnd.nextInt(KEYS), rnd.nextInt());
+
+                            try {
+                                if (restart) {
+                                    doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() {
+                                        @Override public Void call() throws Exception {
+                                            cache.putAll(keys);
+
+                                            return null;
+                                        }
+                                    });
+                                } else {
+                                    try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) {
+                                        cache.putAll(keys);
+
+                                        tx.commit();
+                                    }
+                                }
+                            } catch (TransactionOptimisticException ignore) {
+                                // No-op.
+                            } catch (Throwable e) {
+                                log.error("Unexpected error: " + e, e);
+
+                                throw e;
+                            }
+                        }
+
+                        return null;
+                    }
+                }, threads, "tx-thread");
+
+                updateFut.get(60, SECONDS);
+
+                IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName);
+
+                for (int key = 0; key < KEYS; key++) {
+                    Integer val = cache.get(key);
+
+                    for (int node = 1; node < SRVS + CLIENTS; node++)
+                        assertEquals(val, ignite(node).cache(cache.getName()).get(key));
+                }
+            }
+
+            finished.set(true);
+
+            if (fut != null)
+                fut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param storeEnabled If {@code true} adds cache store.
+     * @param nearCache If {@code true} near cache is enabled.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        boolean storeEnabled,
+        boolean nearCache) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(backups);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        if (storeEnabled) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setWriteThrough(true);
+        }
+
+        if (nearCache)
+            ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<Integer, Integer> create() {
+            return new CacheStoreAdapter<Integer, Integer>() {
+                @Override public Integer load(Integer key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3ed37b26/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4bcf51e..8c9302f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -77,6 +77,8 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.junits.GridAbstractTest;
 import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.Nullable;
 
@@ -1021,8 +1023,23 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      * @throws Exception If failed.
      */
     protected <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        return doInTransaction(ignite, PESSIMISTIC, REPEATABLE_READ, clo);
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param concurrency Transaction concurrency.
+     * @param isolation Transaction isolation.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception If failed.
+     */
+    protected <T> T doInTransaction(Ignite ignite,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation,
+        Callable<T> clo) throws Exception {
         while (true) {
-            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            try (Transaction tx = ignite.transactions().txStart(concurrency, isolation)) {
                 T res = clo.call();
 
                 tx.commit();


Mime
View raw message