ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [02/49] incubator-ignite git commit: #ingite-9655 - Manual merge of changes.
Date Sun, 15 Feb 2015 08:18:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index b14d45d..df2e62b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -156,9 +156,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
     /** */
     private Set<Integer> invalidParts = new GridLeanSet<>();
 
-    /** Recover writes. */
-    private Collection<IgniteTxEntry<K, V>> recoveryWrites;
-
     /**
      * Transaction state. Note that state is not protected, as we want to
      * always use {@link #state()} and {@link #state(IgniteTxState)}
@@ -189,6 +186,9 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
     /** Lock condition. */
     private final Condition cond = lock.newCondition();
 
+    /** */
+    protected Map<UUID, Collection<UUID>> txNodes;
+
     /** Subject ID initiated this transaction. */
     protected UUID subjId;
 
@@ -357,8 +357,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry<K, V>> optimisticLockEntries() {
-        assert optimistic();
-
         if (!groupLock())
             return writeEntries();
         else {
@@ -385,20 +383,6 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
         }
     }
 
-    /**
-     * @param recoveryWrites Recover write entries.
-     */
-    public void recoveryWrites(Collection<IgniteTxEntry<K, V>> recoveryWrites) {
-        this.recoveryWrites = recoveryWrites;
-    }
-
-    /**
-     * @return Recover write entries.
-     */
-    @Override public Collection<IgniteTxEntry<K, V>> recoveryWrites() {
-        return recoveryWrites;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean storeEnabled() {
         return storeEnabled;
@@ -1163,7 +1147,14 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Nullable @Override public Map<UUID, Collection<UUID>> transactionNodes() {
-        return null;
+        return txNodes;
+    }
+
+    /**
+     * @param txNodes Transaction nodes.
+     */
+    public void transactionNodes(Map<UUID, Collection<UUID>> txNodes) {
+        this.txNodes = txNodes;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 541e214..ca69a5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -131,9 +131,6 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     /** Group lock entry flag. */
     private boolean grpLock;
 
-    /** Flag indicating if this entry should be transferred to remote node. */
-    private boolean transferRequired;
-
     /** Deployment enabled flag. */
     private boolean depEnabled;
 
@@ -280,20 +277,6 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable,
     }
 
     /**
-     * @param transferRequired Sets flag indicating that transfer is required to remote node.
-     */
-    public void transferRequired(boolean transferRequired) {
-        this.transferRequired = transferRequired;
-    }
-
-    /**
-     * @return Flag indicating whether transfer is required to remote nodes.
-     */
-    public boolean transferRequired() {
-        return transferRequired;
-    }
-
-    /**
      * @param ctx Context.
      * @return Clean copy of this entry.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
index 321cd44..d6cec9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEx.java
@@ -303,13 +303,6 @@ public interface IgniteTxEx<K, V> extends IgniteTx, GridTimeoutObject {
     public Map<IgniteTxKey<K>, IgniteTxEntry<K, V>> readMap();
 
     /**
-     * Gets pessimistic recovery writes, i.e. values that have never been sent to remote nodes with lock requests.
-     *
-     * @return Collection of recovery writes.
-     */
-    public Collection<IgniteTxEntry<K, V>> recoveryWrites();
-
-    /**
      * Gets a list of entries that needs to be locked on the next step of prepare stage of
      * optimistic transaction.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a3c66c9..fa1e345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -125,20 +125,6 @@ public class IgniteTxHandler<K, V> {
                     processCheckPreparedTxResponse(nodeId, res);
                 }
             });
-
-        ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxRequest.class,
-            new CI2<UUID, GridCachePessimisticCheckCommittedTxRequest<K, V>>() {
-                @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
-                    processCheckCommittedTxRequest(nodeId, req);
-                }
-            });
-
-        ctx.io().addHandler(0, GridCachePessimisticCheckCommittedTxResponse.class,
-            new CI2<UUID, GridCachePessimisticCheckCommittedTxResponse<K, V>>() {
-                @Override public void apply(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
-                    processCheckCommittedTxResponse(nodeId, res);
-                }
-            });
     }
 
     /**
@@ -248,6 +234,10 @@ public class IgniteTxHandler<K, V> {
             if (tx == null)
                 U.warn(log, "Missing local transaction for mapped near version [nearVer=" + req.version()
                     + ", mappedVer=" + mappedVer + ']');
+            else {
+                if (req.concurrency() == PESSIMISTIC)
+                    tx.nearFutureId(req.futureId());
+            }
         }
         else {
             tx = new GridDhtTxLocal<>(
@@ -257,8 +247,8 @@ public class IgniteTxHandler<K, V> {
                 req.futureId(),
                 req.miniId(),
                 req.threadId(),
-                /*implicit*/false,
-                /*implicit-single*/false,
+                req.implicitSingle(),
+                req.implicitSingle(),
                 req.system(),
                 req.concurrency(),
                 req.isolation(),
@@ -279,10 +269,22 @@ public class IgniteTxHandler<K, V> {
                 tx.topologyVersion(req.topologyVersion());
             else
                 U.warn(log, "Failed to create local transaction (was transaction rolled back?) [xid=" +
-                    tx.xid() + ", req=" + req + ']');
+                    req.version() + ", req=" + req + ']');
         }
 
         if (tx != null) {
+            tx.transactionNodes(req.transactionNodes());
+
+            if (req.onePhaseCommit()) {
+                assert req.last();
+                assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1;
+
+                tx.onePhaseCommit(true);
+            }
+
+            if (req.returnValue())
+                tx.needReturnValue(true);
+
             IgniteFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
                 req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
                 req.lastBackups());
@@ -327,8 +329,7 @@ public class IgniteTxHandler<K, V> {
             .<IgniteTxEx<K, V>>future(res.version(), res.futureId());
 
         if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
+            U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']');
 
             return;
         }
@@ -428,12 +429,8 @@ public class IgniteTxHandler<K, V> {
 
         IgniteFuture<IgniteTx> nearFinishFut = null;
 
-        if (locTx == null || locTx.nearLocallyMapped()) {
-            if (locTx != null)
-                req.cloneEntries();
-
+        if (locTx == null || locTx.nearLocallyMapped())
             nearFinishFut = finishDhtLocal(nodeId, locTx, req);
-        }
 
         if (colocatedFinishFut != null && nearFinishFut != null) {
             GridCompoundFuture<IgniteTx, IgniteTx> res = new GridCompoundFuture<>(ctx.kernalContext());
@@ -547,20 +544,6 @@ public class IgniteTxHandler<K, V> {
 
                 tx.nearFinishFutureId(req.futureId());
                 tx.nearFinishMiniId(req.miniId());
-                tx.recoveryWrites(req.recoveryWrites());
-
-                Collection<IgniteTxEntry<K, V>> writeEntries = req.writes();
-
-                if (!F.isEmpty(writeEntries)) {
-                    // In OPTIMISTIC mode, we get the values at PREPARE stage.
-                    assert tx.concurrency() == PESSIMISTIC;
-
-                    for (IgniteTxEntry<K, V> entry : writeEntries)
-                        tx.addEntry(req.messageId(), entry);
-                }
-
-                if (tx.pessimistic())
-                    tx.prepare();
 
                 IgniteFuture<IgniteTx> commitFut = tx.commitAsync();
 
@@ -640,6 +623,8 @@ public class IgniteTxHandler<K, V> {
         assert nodeId != null;
         assert req != null;
 
+        assert req.transactionNodes() != null;
+
         if (log.isDebugEnabled())
             log.debug("Processing dht tx prepare request [locNodeId=" + ctx.localNodeId() +
                 ", nodeId=" + nodeId + ", req=" + req + ']');
@@ -662,13 +647,29 @@ public class IgniteTxHandler<K, V> {
 
             if (dhtTx != null && !F.isEmpty(dhtTx.invalidPartitions()))
                 res.invalidPartitions(dhtTx.invalidPartitions());
+
+            if (req.onePhaseCommit()) {
+                assert req.last();
+
+                if (dhtTx != null) {
+                    dhtTx.onePhaseCommit(true);
+
+                    finish(nodeId, dhtTx, req);
+                }
+
+                if (nearTx != null) {
+                    nearTx.onePhaseCommit(true);
+
+                    finish(nodeId, nearTx, req);
+                }
+            }
         }
         catch (IgniteCheckedException e) {
             if (e instanceof IgniteTxRollbackException)
-                U.error(log, "Transaction was rolled back before prepare completed: " + dhtTx, e);
+                U.error(log, "Transaction was rolled back before prepare completed: " + req, e);
             else if (e instanceof IgniteTxOptimisticException) {
                 if (log.isDebugEnabled())
-                    log.debug("Optimistic failure for remote transaction (will rollback): " + dhtTx);
+                    log.debug("Optimistic failure for remote transaction (will rollback): " + req);
             }
             else
                 U.error(log, "Failed to process prepare request: " + req, e);
@@ -676,9 +677,6 @@ public class IgniteTxHandler<K, V> {
             if (nearTx != null)
                 nearTx.rollback();
 
-            if (dhtTx != null)
-                dhtTx.rollback();
-
             res = new GridDhtTxPrepareResponse<>(req.version(), req.futureId(), req.miniId(), e);
         }
 
@@ -719,84 +717,36 @@ public class IgniteTxHandler<K, V> {
         GridDhtTxRemote<K, V> dhtTx = ctx.tm().tx(req.version());
         GridNearTxRemote<K, V> nearTx = ctx.tm().nearTx(req.version());
 
-        try {
-            if (dhtTx == null && !F.isEmpty(req.writes()))
-                dhtTx = startRemoteTxForFinish(nodeId, req);
-
-            if (dhtTx != null) {
-                dhtTx.syncCommit(req.syncCommit());
-                dhtTx.syncRollback(req.syncRollback());
-            }
-
-            // One-phase commit transactions send finish requests to backup nodes.
-            if (dhtTx != null && req.onePhaseCommit()) {
-                dhtTx.onePhaseCommit(true);
-
-                dhtTx.writeVersion(req.writeVersion());
-            }
-
-            if (nearTx == null && !F.isEmpty(req.nearWrites()) && req.groupLock())
-                nearTx = startNearRemoteTxForFinish(nodeId, req);
-
-            if (nearTx != null) {
-                nearTx.syncCommit(req.syncCommit());
-                nearTx.syncRollback(req.syncRollback());
-            }
-        }
-        catch (IgniteTxRollbackException e) {
-            if (log.isDebugEnabled())
-                log.debug("Received finish request for completed transaction (will ignore) [req=" + req + ", err=" +
-                    e.getMessage() + ']');
-
-            sendReply(nodeId, req);
-
-            return;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to start remote DHT and Near transactions (will invalidate transactions) [dhtTx=" +
-                dhtTx + ", nearTx=" + nearTx + ']', e);
-
-            if (dhtTx != null)
-                dhtTx.invalidate(true);
-
-            if (nearTx != null)
-                nearTx.invalidate(true);
-        }
-        catch (GridDistributedLockCancelledException ignore) {
-            U.warn(log, "Received commit request to cancelled lock (will invalidate transaction) [dhtTx=" +
-                dhtTx + ", nearTx=" + nearTx + ']');
-
-            if (dhtTx != null)
-                dhtTx.invalidate(true);
-
-            if (nearTx != null)
-                nearTx.invalidate(true);
-        }
-
         // Safety - local transaction will finish explicitly.
         if (nearTx != null && nearTx.local())
             nearTx = null;
 
-        finish(nodeId, dhtTx, req, req.writes(), req.ttls());
+        finish(nodeId, dhtTx, req, req.ttls());
 
         if (nearTx != null)
-            finish(nodeId, nearTx, req, req.nearWrites(), req.nearTtls());
+            finish(nodeId, nearTx, req, req.nearTtls());
 
-        sendReply(nodeId, req);
+        if (dhtTx != null && !dhtTx.done()) {
+            dhtTx.finishFuture().listenAsync(new CI1<IgniteFuture<IgniteTx>>() {
+                @Override public void apply(IgniteFuture<IgniteTx> igniteTxIgniteFuture) {
+                    sendReply(nodeId, req);
+                }
+            });
+        }
+        else
+            sendReply(nodeId, req);
     }
 
     /**
      * @param nodeId Node ID.
      * @param tx Transaction.
      * @param req Request.
-     * @param writes Writes.
      * @param ttls TTLs for optimistic transaction.
      */
     protected void finish(
         UUID nodeId,
         IgniteTxRemoteEx<K, V> tx,
         GridDhtTxFinishRequest<K, V> req,
-        Collection<IgniteTxEntry<K, V>> writes,
         @Nullable GridLongList ttls) {
         // We don't allow explicit locks for transactions and
         // therefore immediately return if transaction is null.
@@ -827,21 +777,7 @@ public class IgniteTxHandler<K, V> {
                     tx.invalidate(req.isInvalidate());
                     tx.systemInvalidate(req.isSystemInvalidate());
 
-                    if (!F.isEmpty(writes)) {
-                        // In OPTIMISTIC mode, we get the values at PREPARE stage.
-                        assert tx.concurrency() == PESSIMISTIC;
-
-                        for (IgniteTxEntry<K, V> entry : writes) {
-                            if (log.isDebugEnabled())
-                                log.debug("Unmarshalled transaction entry from pessimistic transaction [key=" +
-                                    entry.key() + ", value=" + entry.value() + ", tx=" + tx + ']');
-
-                            if (!tx.setWriteValue(entry))
-                                U.warn(log, "Received entry to commit that was not present in transaction [entry=" +
-                                    entry + ", tx=" + tx + ']');
-                        }
-                    }
-                    else if (tx.concurrency() == OPTIMISTIC && ttls != null) {
+                    if (tx.concurrency() == OPTIMISTIC && ttls != null) {
                         int idx = 0;
 
                         for (IgniteTxEntry<K, V> e : tx.writeEntries())
@@ -851,15 +787,10 @@ public class IgniteTxHandler<K, V> {
                     // Complete remote candidates.
                     tx.doneRemote(req.baseVersion(), null, null, null);
 
-                    if (tx.pessimistic())
-                        tx.prepare();
-
                     tx.commit();
                 }
             }
             else {
-                assert tx != null;
-
                 tx.doneRemote(req.baseVersion(), null, null, null);
 
                 tx.rollback();
@@ -882,6 +813,37 @@ public class IgniteTxHandler<K, V> {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param tx Transaction.
+     * @param req Request.
+     */
+    protected void finish(
+        UUID nodeId,
+        GridDistributedTxRemoteAdapter<K, V> tx,
+        GridDhtTxPrepareRequest<K, V> req) {
+        assert tx != null : "No transaction for one-phase commit prepare request: " + req;
+
+        try {
+            tx.commitVersion(req.writeVersion());
+            tx.invalidate(req.isInvalidate());
+
+            // Complete remote candidates.
+            tx.doneRemote(req.version(), null, null, null);
+
+            tx.commit();
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed committing transaction [tx=" + tx + ']', e);
+
+            // Mark transaction for invalidate.
+            tx.invalidate(true);
+            tx.systemInvalidate(true);
+
+            tx.rollback();
+        }
+    }
+
+    /**
      * Sends tx finish response to remote node, if response is requested.
      *
      * @param nodeId Node id that originated finish request.
@@ -913,9 +875,11 @@ public class IgniteTxHandler<K, V> {
      * @return Remote transaction.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable GridDhtTxRemote<K, V> startRemoteTx(UUID nodeId,
+    @Nullable GridDhtTxRemote<K, V> startRemoteTx(
+        UUID nodeId,
         GridDhtTxPrepareRequest<K, V> req,
-        GridDhtTxPrepareResponse<K, V> res) throws IgniteCheckedException {
+        GridDhtTxPrepareResponse<K, V> res
+    ) throws IgniteCheckedException {
         if (!F.isEmpty(req.writes())) {
             GridDhtTxRemote<K, V> tx = ctx.tm().tx(req.version());
 
@@ -930,7 +894,7 @@ public class IgniteTxHandler<K, V> {
                     req.threadId(),
                     req.topologyVersion(),
                     req.version(),
-                    req.commitVersion(),
+                    null,
                     req.system(),
                     req.concurrency(),
                     req.isolation(),
@@ -943,6 +907,8 @@ public class IgniteTxHandler<K, V> {
                     req.subjectId(),
                     req.taskNameHash());
 
+                tx.writeVersion(req.writeVersion());
+
                 tx = ctx.tm().onCreated(tx);
 
                 if (tx == null || !ctx.tm().onStarted(tx)) {
@@ -1047,7 +1013,7 @@ public class IgniteTxHandler<K, V> {
                     req.nearNodeId(),
                     req.threadId(),
                     req.version(),
-                    req.commitVersion(),
+                    null,
                     req.system(),
                     req.concurrency(),
                     req.isolation(),
@@ -1060,6 +1026,8 @@ public class IgniteTxHandler<K, V> {
                     req.taskNameHash()
                 );
 
+                tx.writeVersion(req.writeVersion());
+
                 if (!tx.empty()) {
                     tx = ctx.tm().onCreated(tx);
 
@@ -1076,6 +1044,9 @@ public class IgniteTxHandler<K, V> {
             // in prepare phase will get properly ordered as well.
             tx.prepare();
 
+            if (req.last())
+                tx.state(PREPARED);
+
             return tx;
         }
 
@@ -1083,312 +1054,6 @@ public class IgniteTxHandler<K, V> {
     }
 
     /**
-     * @param nodeId Primary node ID.
-     * @param req Request.
-     * @return Remote transaction.
-     * @throws IgniteCheckedException If failed.
-     * @throws GridDistributedLockCancelledException If lock has been cancelled.
-     */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable GridDhtTxRemote<K, V> startRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
-        throws IgniteCheckedException, GridDistributedLockCancelledException {
-
-        GridDhtTxRemote<K, V> tx = null;
-
-        boolean marked = false;
-
-        for (IgniteTxEntry<K, V> txEntry : req.writes()) {
-            GridDistributedCacheEntry<K, V> entry = null;
-
-            GridCacheContext<K, V> cacheCtx = txEntry.context();
-
-            while (true) {
-                try {
-                    int part = cacheCtx.affinity().partition(txEntry.key());
-
-                    GridDhtLocalPartition<K, V> locPart = cacheCtx.topology().localPartition(part,
-                        req.topologyVersion(), false);
-
-                    // Handle implicit locks for pessimistic transactions.
-                    if (tx == null)
-                        tx = ctx.tm().tx(req.version());
-
-                    if (locPart == null || !locPart.reserve()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Local partition for given key is already evicted (will remove from tx) " +
-                                "[key=" + txEntry.key() + ", part=" + part + ", locPart=" + locPart + ']');
-
-                        if (tx != null)
-                            tx.clearEntry(txEntry.txKey());
-
-                        break;
-                    }
-
-                    try {
-                        entry = (GridDistributedCacheEntry<K, V>)cacheCtx.cache().entryEx(txEntry.key(),
-                            req.topologyVersion());
-
-                        if (tx == null) {
-                            tx = new GridDhtTxRemote<>(
-                                ctx,
-                                req.nearNodeId(),
-                                req.futureId(),
-                                nodeId,
-                                // We can pass null as nearXidVersion as transaction will be committed right away.
-                                null,
-                                req.threadId(),
-                                req.topologyVersion(),
-                                req.version(),
-                                /*commitVer*/null,
-                                req.system(),
-                                PESSIMISTIC,
-                                req.isolation(),
-                                req.isInvalidate(),
-                                0,
-                                req.txSize(),
-                                req.groupLockKey(),
-                                req.subjectId(),
-                                req.taskNameHash());
-
-                            tx = ctx.tm().onCreated(tx);
-
-                            if (tx == null || !ctx.tm().onStarted(tx))
-                                throw new IgniteTxRollbackException("Failed to acquire lock " +
-                                    "(transaction has been completed): " + req.version());
-                        }
-
-                        tx.addWrite(cacheCtx,
-                            txEntry.op(),
-                            txEntry.txKey(),
-                            txEntry.keyBytes(),
-                            txEntry.value(),
-                            txEntry.valueBytes(),
-                            txEntry.entryProcessors(),
-                            txEntry.drVersion(),
-                            txEntry.ttl());
-
-                        if (!marked) {
-                            if (tx.markFinalizing(USER_FINISH))
-                                marked = true;
-                            else {
-                                tx.clearEntry(txEntry.txKey());
-
-                                return null;
-                            }
-                        }
-
-                        // Add remote candidate before reordering.
-                        if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
-                            entry.addRemote(
-                                req.nearNodeId(),
-                                nodeId,
-                                req.threadId(),
-                                req.version(),
-                                0,
-                                /*tx*/true,
-                                tx.implicitSingle(),
-                                null
-                            );
-
-                        // Double-check in case if sender node left the grid.
-                        if (ctx.discovery().node(req.nearNodeId()) == null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
-
-                            tx.rollback();
-
-                            return null;
-                        }
-
-                        // Entry is legit.
-                        break;
-                    }
-                    finally {
-                        locPart.release();
-                    }
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
-                        entry;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
-
-                    tx.clearEntry(txEntry.txKey());
-
-                    if (log.isDebugEnabled())
-                        log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
-                            entry + ", tx=" + tx + ']');
-                }
-                catch (GridDhtInvalidPartitionException p) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received invalid partition (will clear entry from tx) [part=" + p + ", req=" +
-                            req + ", txEntry=" + txEntry + ']');
-
-                    if (tx != null)
-                        tx.clearEntry(txEntry.txKey());
-
-                    break;
-                }
-            }
-        }
-
-        if (tx != null && tx.empty()) {
-            tx.rollback();
-
-            return null;
-        }
-
-        return tx;
-    }
-
-    /**
-     * @param nodeId Primary node ID.
-     * @param req Request.
-     * @return Remote transaction.
-     * @throws IgniteCheckedException If failed.
-     * @throws GridDistributedLockCancelledException If lock has been cancelled.
-     */
-    @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable public GridNearTxRemote<K, V> startNearRemoteTxForFinish(UUID nodeId, GridDhtTxFinishRequest<K, V> req)
-        throws IgniteCheckedException, GridDistributedLockCancelledException {
-        assert req.groupLock();
-
-        GridNearTxRemote<K, V> tx = null;
-
-        ClassLoader ldr = ctx.deploy().globalLoader();
-
-        if (ldr != null) {
-            boolean marked = false;
-
-            for (IgniteTxEntry<K, V> txEntry : req.nearWrites()) {
-                GridDistributedCacheEntry<K, V> entry = null;
-
-                GridCacheContext<K, V> cacheCtx = txEntry.context();
-
-                while (true) {
-                    try {
-                        entry = cacheCtx.near().peekExx(txEntry.key());
-
-                        if (entry != null) {
-                            entry.keyBytes(txEntry.keyBytes());
-
-                            // Handle implicit locks for pessimistic transactions.
-                            if (tx == null)
-                                tx = ctx.tm().nearTx(req.version());
-
-                            if (tx == null) {
-                                tx = new GridNearTxRemote<>(
-                                    ctx,
-                                    nodeId,
-                                    req.nearNodeId(),
-                                    // We can pass null as nearXidVer as transaction will be committed right away.
-                                    null,
-                                    req.threadId(),
-                                    req.version(),
-                                    null,
-                                    req.system(),
-                                    PESSIMISTIC,
-                                    req.isolation(),
-                                    req.isInvalidate(),
-                                    0,
-                                    req.txSize(),
-                                    req.groupLockKey(),
-                                    req.subjectId(),
-                                    req.taskNameHash());
-
-                                tx = ctx.tm().onCreated(tx);
-
-                                if (tx == null || !ctx.tm().onStarted(tx))
-                                    throw new IgniteTxRollbackException("Failed to acquire lock " +
-                                        "(transaction has been completed): " + req.version());
-
-                                if (!marked)
-                                    marked = tx.markFinalizing(USER_FINISH);
-
-                                if (!marked)
-                                    return null;
-                            }
-
-                            if (tx.local())
-                                return null;
-
-                            if (!marked)
-                                marked = tx.markFinalizing(USER_FINISH);
-
-                            if (marked)
-                                tx.addEntry(cacheCtx, txEntry.txKey(), txEntry.keyBytes(), txEntry.op(), txEntry.value(),
-                                    txEntry.valueBytes(), txEntry.drVersion());
-                            else
-                                return null;
-
-                            if (req.groupLock()) {
-                                tx.markGroupLock();
-
-                                if (!txEntry.groupLockEntry())
-                                    tx.groupLockKey(txEntry.txKey());
-                            }
-
-                            // Add remote candidate before reordering.
-                            if (txEntry.explicitVersion() == null && !txEntry.groupLockEntry())
-                                entry.addRemote(
-                                    req.nearNodeId(),
-                                    nodeId,
-                                    req.threadId(),
-                                    req.version(),
-                                    0,
-                                    /*tx*/true,
-                                    tx.implicitSingle(),
-                                    null
-                                );
-                        }
-
-                        // Double-check in case if sender node left the grid.
-                        if (ctx.discovery().node(req.nearNodeId()) == null) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node requesting lock left grid (lock request will be ignored): " + req);
-
-                            if (tx != null)
-                                tx.rollback();
-
-                            return null;
-                        }
-
-                        // Entry is legit.
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        assert entry.obsoleteVersion() != null : "Obsolete flag not set on removed entry: " +
-                            entry;
-
-                        if (log.isDebugEnabled())
-                            log.debug("Received entry removed exception (will retry on renewed entry): " + entry);
-
-                        if (tx != null) {
-                            tx.clearEntry(txEntry.txKey());
-
-                            if (log.isDebugEnabled())
-                                log.debug("Cleared removed entry from remote transaction (will retry) [entry=" +
-                                    entry + ", tx=" + tx + ']');
-                        }
-
-                        // Will retry in while loop.
-                    }
-                }
-            }
-        }
-        else {
-            String err = "Failed to acquire deployment class loader for message: " + req;
-
-            U.warn(log, err);
-
-            throw new IgniteCheckedException(err);
-        }
-
-        return tx;
-    }
-
-    /**
      * @param nodeId Node ID.
      * @param req Request.
      */
@@ -1437,83 +1102,4 @@ public class IgniteTxHandler<K, V> {
 
         fut.onResult(nodeId, res);
     }
-
-    /**
-     * @param nodeId Node ID.
-     * @param req Request.
-     */
-    protected void processCheckCommittedTxRequest(final UUID nodeId,
-        final GridCachePessimisticCheckCommittedTxRequest<K, V> req) {
-        if (log.isDebugEnabled())
-            log.debug("Processing check committed transaction request [nodeId=" + nodeId + ", req=" + req + ']');
-
-        IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut = ctx.tm().checkPessimisticTxCommitted(req);
-
-        infoFut.listenAsync(new CI1<IgniteFuture<GridCacheCommittedTxInfo<K, V>>>() {
-            @Override public void apply(IgniteFuture<GridCacheCommittedTxInfo<K, V>> infoFut) {
-                GridCacheCommittedTxInfo<K, V> info = null;
-
-                try {
-                    info = infoFut.get();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to obtain committed info for transaction (will rollback): " + req, e);
-                }
-
-                GridCachePessimisticCheckCommittedTxResponse<K, V>
-                    res = new GridCachePessimisticCheckCommittedTxResponse<>(
-                    req.version(), req.futureId(), req.miniId(), info);
-
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for tx committed info [req=" + req + ", res=" + res + ']');
-
-                sendCheckCommittedResponse(nodeId, res);            }
-        });
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param res Response.
-     */
-    protected void processCheckCommittedTxResponse(UUID nodeId,
-        GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
-        if (log.isDebugEnabled())
-            log.debug("Processing check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
-        GridCachePessimisticCheckCommittedTxFuture<K, V> fut =
-            (GridCachePessimisticCheckCommittedTxFuture<K, V>)ctx.mvcc().<GridCacheCommittedTxInfo<K, V>>future(
-                res.version(), res.futureId());
-
-        if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Received response for unknown future (will ignore): " + res);
-
-            return;
-        }
-
-        fut.onResult(nodeId, res);
-    }
-
-    /**
-     * Sends check committed response to remote node.
-     *
-     * @param nodeId Node ID to send to.
-     * @param res Reponse to send.
-     */
-    private void sendCheckCommittedResponse(UUID nodeId, GridCachePessimisticCheckCommittedTxResponse<K, V> res) {
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Sending check committed transaction response [nodeId=" + nodeId + ", res=" + res + ']');
-
-            ctx.io().send(nodeId, res);
-        }
-        catch (ClusterTopologyException ignored) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to send check committed transaction response (did node leave grid?) [nodeId=" +
-                    nodeId + ", res=" + res + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send response to node [nodeId=" + nodeId + ", res=" + res + ']', e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index b0934f1..d1a2be5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.lang.*;
@@ -93,6 +94,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
     /** Active cache IDs. */
     protected Set<Integer> activeCacheIds = new HashSet<>();
 
+    /** Need return value. */
+    protected boolean needRetVal;
+
+    /** Implicit transaction result. */
+    protected GridCacheReturn<V> implicitRes = new GridCacheReturn<>(false);
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -264,6 +271,32 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             writeView.seal();
     }
 
+    /** {@inheritDoc} */
+    @Override public GridCacheReturn<V> implicitSingleResult() {
+        return implicitRes;
+    }
+
+    /**
+     * @param ret Result.
+     */
+    public void implicitSingleResult(GridCacheReturn<V> ret) {
+        implicitRes = ret;
+    }
+
+    /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return needRetVal;
+    }
+
+    /**
+     * @param needRetVal Need return value flag.
+     */
+    public void needReturnValue(boolean needRetVal) {
+        this.needRetVal = needRetVal;
+    }
+
     /**
      * @param snd {@code True} if values in tx entries should be replaced with transformed values and sent
      * to remote nodes.
@@ -650,9 +683,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
             addGroupTxMapping(writeSet());
 
         if (!empty) {
-            // We are holding transaction-level locks for entries here, so we can get next write version.
-            writeVersion(cctx.versions().next(topologyVersion()));
-
             batchStoreCommit(writeMap().values());
 
             try {
@@ -778,7 +808,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             evt,
                                             metrics,
                                             topVer,
-                                            txEntry.filters(),
+                                            null,
                                             cached.detached() ? DR_NONE : drType,
                                             txEntry.drExpireTime(),
                                             cached.isNear() ? null : explicitVer,
@@ -815,7 +845,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                             evt,
                                             metrics,
                                             topVer,
-                                            txEntry.filters(),
+                                            null,
                                             cached.detached()  ? DR_NONE : drType,
                                             cached.isNear() ? null : explicitVer,
                                             CU.subjectId(this, cctx),
@@ -1856,7 +1886,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
      */
     private boolean filter(GridCacheEntryEx<K, V> cached,
         IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
-        return pessimistic() || cached.context().isAll(cached, filter);
+        return pessimistic() || (optimistic() && implicit()) || cached.context().isAll(cached, filter);
     }
 
     /**
@@ -1994,7 +2024,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                             boolean readThrough = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter);
 
-                            if (optimistic()) {
+                            if (optimistic() && !implicit()) {
                                 try {
                                     // Should read through if filter is specified.
                                     old = entry.innerGet(this,
@@ -2011,6 +2041,11 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                                         CU.<K, V>empty(),
                                         null);
                                 }
+                                catch (ClusterTopologyException e) {
+                                    entry.context().evicts().touch(entry, topologyVersion());
+
+                                    throw e;
+                                }
                                 catch (GridCacheFilterFailedException e) {
                                     e.printStackTrace();
 
@@ -2072,7 +2107,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
 
                             enlisted.add(key);
 
-                            if (!pessimistic() || (groupLock() && !lockOnly)) {
+                            if ((!pessimistic() && !implicit()) || (groupLock() && !lockOnly)) {
                                 txEntry.markValid();
 
                                 if (old == null) {
@@ -2463,11 +2498,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         @Nullable final Map<? extends K, GridCacheDrInfo<V>> drMap,
         final boolean retval,
         @Nullable GridCacheEntryEx<K, V> cached,
-        @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) {
+        @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+    ) {
         assert filter == null || invokeMap == null;
 
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_PUT);
 
+        if (retval)
+            needReturnValue(true);
+
         // Cached entry may be passed only from entry wrapper.
         final Map<K, V> map0;
         final Map<K, EntryProcessor<K, V, Object>> invokeMap0;
@@ -2651,13 +2690,34 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         cctx.kernalContext());
             }
             else {
-                return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() {
-                    @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException {
-                        f.get();
+                if (implicit()) {
+                    // Should never load missing values for implicit transaction as values will be returned
+                    // with prepare response, if required.
+                    assert loadFut.isDone();
 
-                        return ret;
+                    try {
+                        loadFut.get();
                     }
-                });
+                    catch (IgniteCheckedException e) {
+                        return new GridFinishedFutureEx<>(new GridCacheReturn<V>(), e);
+                    }
+
+                    return commitAsync().chain(new CX1<IgniteFuture<IgniteTx>, GridCacheReturn<V>>() {
+                        @Override public GridCacheReturn<V> applyx(IgniteFuture<IgniteTx> txFut) throws IgniteCheckedException {
+                            txFut.get();
+
+                            return implicitRes;
+                        }
+                    });
+                }
+                else
+                    return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() {
+                        @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException {
+                            f.get();
+
+                            return ret;
+                        }
+                    });
             }
         }
         catch (IgniteCheckedException e) {
@@ -2696,6 +2756,9 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter) {
         cacheCtx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
 
+        if (retval)
+            needReturnValue(true);
+
         final Collection<? extends K> keys0;
 
         if (drMap != null) {
@@ -2851,13 +2914,27 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
                         cctx.kernalContext());
             }
             else {
-                return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() {
-                    @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException {
-                        f.get();
+                if (implicit()) {
+                    // Should never load missing values for implicit transaction as values will be returned
+                    // with prepare response, if required.
+                    assert loadFut.isDone();
 
-                        return ret;
-                    }
-                });
+                    return commitAsync().chain(new CX1<IgniteFuture<IgniteTx>, GridCacheReturn<V>>() {
+                        @Override public GridCacheReturn<V> applyx(IgniteFuture<IgniteTx> txFut) throws IgniteCheckedException {
+                            txFut.get();
+
+                            return implicitRes;
+                        }
+                    });
+                }
+                else
+                    return loadFut.chain(new CX1<IgniteFuture<Set<K>>, GridCacheReturn<V>>() {
+                        @Override public GridCacheReturn<V> applyx(IgniteFuture<Set<K>> f) throws IgniteCheckedException {
+                            f.get();
+
+                            return ret;
+                        }
+                    });
             }
         }
         catch (IgniteCheckedException e) {
@@ -3168,7 +3245,6 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
         }
 
         txEntry.filtersSet(filtersSet);
-        txEntry.transferRequired(true);
 
         while (true) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 22bf372..267e43d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -153,6 +153,11 @@ public interface IgniteTxLocalEx<K, V> extends IgniteTxEx<K, V> {
     public boolean partitionLock();
 
     /**
+     * @return Return value for
+     */
+    public GridCacheReturn<V> implicitSingleResult();
+
+    /**
      * Finishes transaction (either commit or rollback).
      *
      * @param commit {@code True} if commit, {@code false} if rollback.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fae1989..df9f409 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -87,15 +87,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         new ConcurrentSkipListMap<>();
 
     /** Committed local transactions. */
-    private final GridBoundedConcurrentOrderedSet<GridCacheVersion> committedVers =
-        new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
-
-    /** Rolled back local transactions. */
-    private final NavigableSet<GridCacheVersion> rolledbackVers =
-        new GridBoundedConcurrentOrderedSet<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
-
-    /** Pessimistic commit buffer. */
-    private GridCacheTxCommitBuffer<K, V> pessimisticRecoveryBuf;
+    private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers =
+        new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(GG_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT));
 
     /** Transaction synchronizations. */
     private final Collection<IgniteTxSynchronization> syncs =
@@ -147,8 +140,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        pessimisticRecoveryBuf = new GridCachePerThreadTxCommitBuffer<>(cctx);
-
         txFinishSync = new GridCacheTxFinishSync<>(cctx);
 
         txHandler = new IgniteTxHandler<>(cctx);
@@ -290,11 +281,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         X.println(">>>   prepareQueueSize: " + prepareQ.size());
         X.println(">>>   startVerCntsSize [size=" + startVerCnts.size() +
             ", firstVer=" + startVerEntry + ']');
-        X.println(">>>   committedVersSize: " + committedVers.size());
-        X.println(">>>   rolledbackVersSize: " + rolledbackVers.size());
-
-        if (pessimisticRecoveryBuf != null)
-            X.println(">>>   pessimsticCommitBufSize: " + pessimisticRecoveryBuf.size());
+        X.println(">>>   completedVersSize: " + completedVers.size());
     }
 
     /**
@@ -335,15 +322,8 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     /**
      * @return Committed versions size.
      */
-    public int committedVersionsSize() {
-        return committedVers.size();
-    }
-
-    /**
-     * @return Rolled back versions size.
-     */
-    public int rolledbackVersionsSize() {
-        return rolledbackVers.size();
+    public int completedVersionsSize() {
+        return completedVers.size();
     }
 
     /**
@@ -353,7 +333,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      *      {@code false} otherwise.
      */
     public boolean isCompleted(IgniteTxEx<K, V> tx) {
-        return committedVers.contains(tx.xidVersion()) || rolledbackVers.contains(tx.xidVersion());
+        return completedVers.containsKey(tx.xidVersion());
     }
 
     /**
@@ -753,7 +733,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled();
 
         // Clean up committed transactions queue.
-        if (tx.pessimistic()) {
+        if (tx.pessimistic() && tx.local()) {
             if (tx.enforceSerializable() && txSerializableEnabled) {
                 for (Iterator<IgniteTxEx<K, V>> it = committedQ.iterator(); it.hasNext();) {
                     IgniteTxEx<K, V> committedTx = it.next();
@@ -863,7 +843,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         }
 
         // Optimistic.
-        assert tx.optimistic();
+        assert tx.optimistic() || !tx.local();
 
         if (!lockMultiple(tx, tx.optimisticLockEntries())) {
             tx.setRollbackOnly();
@@ -943,14 +923,17 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
-     * @param c Collection to copy.
+     * @param map Collection to copy.
+     * @param expVal Values to copy.
      * @return Copy of the collection.
      */
-    private Collection<GridCacheVersion> copyOf(Iterable<GridCacheVersion> c) {
+    private Collection<GridCacheVersion> copyOf(Map<GridCacheVersion, Boolean> map, boolean expVal) {
         Collection<GridCacheVersion> l = new LinkedList<>();
 
-        for (GridCacheVersion v : c)
-            l.add(v);
+        for (Map.Entry<GridCacheVersion, Boolean> e : map.entrySet()) {
+            if (e.getValue() == expVal)
+                l.add(e.getKey());
+        }
 
         return l;
     }
@@ -962,9 +945,10 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      * @return Committed transactions starting from the given version (non-inclusive).
      */
     public Collection<GridCacheVersion> committedVersions(GridCacheVersion min) {
-        Set<GridCacheVersion> set = committedVers.tailSet(min, true);
+        ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
+            = completedVers.tailMap(min, true);
 
-        return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set);
+        return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, true);
     }
 
     /**
@@ -974,16 +958,17 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      * @return Committed transactions starting from the given version (non-inclusive).
      */
     public Collection<GridCacheVersion> rolledbackVersions(GridCacheVersion min) {
-        Set<GridCacheVersion> set = rolledbackVers.tailSet(min, true);
+        ConcurrentNavigableMap<GridCacheVersion, Boolean> tail
+            = completedVers.tailMap(min, true);
 
-        return set == null || set.isEmpty() ? Collections.<GridCacheVersion>emptyList() : copyOf(set);
+        return F.isEmpty(tail) ? Collections.<GridCacheVersion>emptyList() : copyOf(tail, false);
     }
 
     /**
      * @param tx Tx to remove.
      */
     public void removeCommittedTx(IgniteTxEx<K, V> tx) {
-        committedVers.remove(tx.xidVersion());
+        completedVers.remove(tx.xidVersion(), true);
     }
 
     /**
@@ -1008,12 +993,12 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      * @return If transaction was not already present in completed set.
      */
     public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) {
-        assert !rolledbackVers.contains(xidVer) : "Version was rolled back: " + xidVer;
-
         if (nearXidVer != null)
             xidVer = new CommittedVersion(xidVer, nearXidVer);
 
-        if (committedVers.add(xidVer)) {
+        Boolean committed = completedVers.putIfAbsent(xidVer, true);
+
+        if (committed == null || committed) {
             if (log.isDebugEnabled())
                 log.debug("Added transaction to committed version set: " + xidVer);
 
@@ -1021,7 +1006,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         }
         else {
             if (log.isDebugEnabled())
-                log.debug("Transaction is already present in committed version set: " + xidVer);
+                log.debug("Transaction is already present in rolled back version set: " + xidVer);
 
             return false;
         }
@@ -1032,9 +1017,9 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      * @return If transaction was not already present in completed set.
      */
     public boolean addRolledbackTx(GridCacheVersion xidVer) {
-        assert !committedVers.contains(xidVer);
+        Boolean committed = completedVers.putIfAbsent(xidVer, false);
 
-        if (rolledbackVers.add(xidVer)) {
+        if (committed == null || !committed) {
             if (log.isDebugEnabled())
                 log.debug("Added transaction to rolled back version set: " + xidVer);
 
@@ -1042,7 +1027,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
         }
         else {
             if (log.isDebugEnabled())
-                log.debug("Transaction is already present in rolled back version set: " + xidVer);
+                log.debug("Transaction is already present in committed version set: " + xidVer);
 
             return false;
         }
@@ -1172,13 +1157,15 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
          * so we don't do it here.
          */
 
+        Boolean committed = completedVers.get(tx.xidVersion());
+
         // 1. Make sure that committed version has been recorded.
-        if (!(committedVers.contains(tx.xidVersion()) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
+        if (!(committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate()) {
             uncommitTx(tx);
 
             throw new IgniteException("Missing commit version (consider increasing " +
                 GG_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" +
-                committedVers.firstx() + ", lastVer=" + committedVers.lastx() + ", tx=" + tx.xid() + ']');
+                completedVers.firstKey() + ", lastVer=" + completedVers.lastKey() + ", tx=" + tx.xid() + ']');
         }
 
         ConcurrentMap<GridCacheVersion, IgniteTxEx<K, V>> txIdMap = transactionMap(tx);
@@ -1197,9 +1184,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
             for (GridCacheContext<K, V> cacheCtx : cctx.cacheContexts())
                 cacheCtx.dataStructures().onTxCommitted(tx);
 
-            // 3.2 Add to pessimistic commit buffer if needed.
-            addPessimisticRecovery(tx);
-
             // 4. Unlock write resources.
             if (tx.groupLock())
                 unlockGroupLocks(tx);
@@ -1513,7 +1497,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
      */
     private boolean lockMultiple(IgniteTxEx<K, V> tx, Iterable<IgniteTxEntry<K, V>> entries)
         throws IgniteCheckedException {
-        assert tx.optimistic();
+        assert tx.optimistic() || !tx.local();
 
         long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout());
 
@@ -1524,7 +1508,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
 
         for (IgniteTxEntry<K, V> txEntry1 : entries) {
             // Check if this entry was prepared before.
-            if (!txEntry1.markPrepared())
+            if (!txEntry1.markPrepared() || txEntry1.explicitVersion() != null)
                 continue;
 
             GridCacheContext<K, V> cacheCtx = txEntry1.context();
@@ -1638,11 +1622,11 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
                 try {
                     GridCacheEntryEx<K, V> entry = txEntry.cached();
 
+                    assert entry != null;
+
                     if (entry.detached())
                         break;
 
-                    assert entry != null;
-
                     entry.txUnlock(tx);
 
                     break;
@@ -1803,7 +1787,12 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
 
         // Not all transactions were found. Need to scan committed versions to check
         // if transaction was already committed.
-        for (GridCacheVersion ver : committedVers) {
+        for (Map.Entry<GridCacheVersion, Boolean> e : completedVers.entrySet()) {
+            if (!e.getValue())
+                continue;
+
+            GridCacheVersion ver = e.getKey();
+
             if (processedVers != null && processedVers.contains(ver))
                 continue;
 
@@ -1821,37 +1810,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
-     * Adds transaction to pessimistic recovery buffer if needed.
-     *
-     * @param tx Committed transaction to add.
-     */
-    private void addPessimisticRecovery(IgniteTxEx<K, V> tx) {
-        if (pessimisticRecoveryBuf == null)
-            return;
-
-        // Do not store recovery information for optimistic or replicated local transactions.
-        if (tx.optimistic() || (tx.local() && tx.replicated()))
-            return;
-
-        pessimisticRecoveryBuf.addCommittedTx(tx);
-    }
-
-    /**
-     * Checks whether transaction with given near version was committed on this node and returns commit info.
-     *
-     * @param nearTxVer Near tx version.
-     * @param originatingNodeId Originating node ID.
-     * @param originatingThreadId Originating thread ID.
-     * @return Commit info, if present.
-     */
-    @Nullable public GridCacheCommittedTxInfo<K, V> txCommitted(GridCacheVersion nearTxVer,
-        UUID originatingNodeId, long originatingThreadId) {
-        assert pessimisticRecoveryBuf != null : "Should not be called for LOCAL cache.";
-
-        return pessimisticRecoveryBuf.committedTx(nearTxVer, originatingNodeId, originatingThreadId);
-    }
-
-    /**
      * Gets local transaction for pessimistic tx recovery.
      *
      * @param nearXidVer Near tx ID.
@@ -1952,8 +1910,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
                         continue;
                     }
 
-                    ((IgniteTxAdapter<K, V>)tx).recoveryWrites(commitInfo.recoveryWrites());
-
                     // If write was not found, check read.
                     IgniteTxEntry<K, V> read = tx.readMap().remove(entry.txKey());
 
@@ -1974,45 +1930,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
     }
 
     /**
-     * @param req Check committed request.
-     * @return Check committed future.
-     */
-    public IgniteFuture<GridCacheCommittedTxInfo<K, V>> checkPessimisticTxCommitted(GridCachePessimisticCheckCommittedTxRequest req) {
-        // First check if we have near transaction with this ID.
-        IgniteTxEx<K, V> tx = localTxForRecovery(req.nearXidVersion(), !req.nearOnlyCheck());
-
-        // Either we found near transaction or one of transactions is being committed by user.
-        // Wait for it and send reply.
-        if (tx != null) {
-            assert tx.local();
-
-            if (log.isDebugEnabled())
-                log.debug("Found active near transaction, will wait for completion [req=" + req + ", tx=" + tx + ']');
-
-            final IgniteTxEx<K, V> tx0 = tx;
-
-            return tx.finishFuture().chain(new C1<IgniteFuture<IgniteTx>, GridCacheCommittedTxInfo<K, V>>() {
-                @Override public GridCacheCommittedTxInfo<K, V> apply(IgniteFuture<IgniteTx> txFut) {
-                    GridCacheCommittedTxInfo<K, V> info = null;
-
-                    if (tx0.state() == COMMITTED)
-                        info = new GridCacheCommittedTxInfo<>(tx0);
-
-                    return info;
-                }
-            });
-        }
-
-        GridCacheCommittedTxInfo<K, V> info = txCommitted(req.nearXidVersion(), req.originatingNodeId(),
-            req.originatingThreadId());
-
-        if (info == null)
-            info = txCommitted(req.nearXidVersion(), req.originatingNodeId(), req.originatingThreadId());
-
-        return new GridFinishedFutureEx<>(info);
-    }
-
-    /**
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
@@ -2050,7 +1967,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
                         // Invalidate transactions.
                         salvageTx(tx, false, RECOVERY_FINISH);
                     }
-                    else if (tx.optimistic()) {
+                    else {
                         // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
                         if (tx.originatingNodeId().equals(evtNodeId)) {
                             if (tx.state() == PREPARED)
@@ -2062,23 +1979,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
                             }
                         }
                     }
-                    else {
-                        // Pessimistic.
-                        if (tx.originatingNodeId().equals(evtNodeId)) {
-                            if (tx.state() != COMMITTING && tx.state() != COMMITTED)
-                                commitIfRemotelyCommitted(tx);
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Skipping pessimistic transaction check (transaction is being committed) " +
-                                        "[tx=" + tx + ", locNodeId=" + cctx.localNodeId() + ']');
-                            }
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Skipping pessimistic transaction check [tx=" + tx +
-                                    ", evtNodeId=" + evtNodeId + ", locNodeId=" + cctx.localNodeId() + ']');
-                        }
-                    }
                 }
             }
             finally {
@@ -2097,7 +1997,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
             assert !F.isEmpty(tx.transactionNodes());
             assert tx.nearXidVersion() != null;
 
-
             GridCacheOptimisticCheckPreparedTxFuture<K, V> fut = new GridCacheOptimisticCheckPreparedTxFuture<>(
                 cctx, tx, evtNodeId, tx.transactionNodes());
 
@@ -2108,25 +2007,6 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
 
             fut.prepare();
         }
-
-        /**
-         * Commits pessimistic transaction if at least one of remote nodes has committed this transaction.
-         *
-         * @param tx Transaction.
-         */
-        private void commitIfRemotelyCommitted(IgniteTxEx<K, V> tx) {
-            assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
-
-            GridCachePessimisticCheckCommittedTxFuture<K, V> fut = new GridCachePessimisticCheckCommittedTxFuture<>(
-                cctx, tx, evtNodeId);
-
-            cctx.mvcc().addFuture(fut);
-
-            if (log.isDebugEnabled())
-                log.debug("Checking pessimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']');
-
-            fut.prepare();
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java
index 3ac8227..3a46734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxProxyImpl.java
@@ -71,6 +71,13 @@ public class IgniteTxProxyImpl<K, V> implements IgniteTxProxy, Externalizable {
     }
 
     /**
+     * @return Transaction.
+     */
+    public IgniteTxEx<K, V> tx() {
+        return tx;
+    }
+
+    /**
      * Enters a call.
      */
     private void enter() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
index 265e2e2..549a18e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java
@@ -189,10 +189,10 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
                     if (lsnr != null)
                         lsnr.apply(key, val);
                 }
-                catch (NoSuchElementException e1) {
-                    e1.printStackTrace(); // Should never happen.
+                catch (NoSuchElementException ignored) {
+                    cnt.incrementAndGet();
 
-                    assert false : "Internal error in grid bounded ordered set.";
+                    return;
                 }
             }
         }
@@ -226,7 +226,12 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
      * @return {@inheritDoc}
      */
     @Override public V remove(Object o) {
-        throw new UnsupportedOperationException("Remove is not supported on concurrent bounded map.");
+        V old = super.remove(o);
+
+        if (old != null)
+            cnt.decrementAndGet();
+
+        return old;
     }
 
     /**
@@ -237,6 +242,11 @@ public class GridBoundedConcurrentOrderedMap<K, V> extends ConcurrentSkipListMap
      * @return {@inheritDoc}
      */
     @Override public boolean remove(Object key, Object val) {
-        throw new UnsupportedOperationException("Remove is not supported on concurrent bounded map.");
+        boolean rmvd = super.remove(key, val);
+
+        if (rmvd)
+            cnt.decrementAndGet();
+
+        return rmvd;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 7f17746..d92ad4d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -461,11 +461,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testGetAll() throws Exception {
         IgniteTx tx = txEnabled() ? cache().txStart() : null;
 
-        cache().put("key1", 1);
-        cache().put("key2", 2);
+        try {
+            cache().put("key1", 1);
+            cache().put("key2", 2);
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
 
         assert cache().getAll(null).isEmpty();
         assert cache().getAll(Collections.<String>emptyList()).isEmpty();
@@ -494,30 +500,36 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         if (txEnabled()) {
             tx = cache().txStart();
 
-            assert cache().getAll(null).isEmpty();
-            assert cache().getAll(Collections.<String>emptyList()).isEmpty();
+            try {
+                assert cache().getAll(null).isEmpty();
+                assert cache().getAll(Collections.<String>emptyList()).isEmpty();
 
-            map1 = cache().getAll(F.asList("key1", "key2", "key9999"));
+                map1 = cache().getAll(F.asList("key1", "key2", "key9999"));
 
-            info("Retrieved map1: " + map1);
+                info("Retrieved map1: " + map1);
 
-            assert 2 == map1.size() : "Invalid map: " + map1;
+                assert 2 == map1.size() : "Invalid map: " + map1;
 
-            assertEquals(1, (int)map1.get("key1"));
-            assertEquals(2, (int)map1.get("key2"));
-            assertNull(map1.get("key9999"));
+                assertEquals(1, (int)map1.get("key1"));
+                assertEquals(2, (int)map1.get("key2"));
+                assertNull(map1.get("key9999"));
 
-            map2 = cache().getAll(F.asList("key1", "key2", "key9999"));
+                map2 = cache().getAll(F.asList("key1", "key2", "key9999"));
 
-            info("Retrieved map2: " + map2);
+                info("Retrieved map2: " + map2);
 
-            assert 2 == map2.size() : "Invalid map: " + map2;
+                assert 2 == map2.size() : "Invalid map: " + map2;
 
-            assertEquals(1, (int)map2.get("key1"));
-            assertEquals(2, (int)map2.get("key2"));
-            assertNull(map2.get("key9999"));
+                assertEquals(1, (int)map2.get("key1"));
+                assertEquals(2, (int)map2.get("key2"));
+                assertNull(map2.get("key9999"));
 
-            tx.commit();
+                tx.commit();
+            }
+            finally {
+                if (tx != null)
+                    tx.close();
+            }
         }
     }
 
@@ -571,14 +583,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         if (txEnabled()) {
             IgniteTx tx = cache().txStart();
 
-            cache().put("key1", 100);
-            cache().put("key2", 101);
-            cache().put("key3", 200);
-            cache().put("key4", 201);
-
-            tx.commit();
+            try {
+                cache().put("key1", 100);
+                cache().put("key2", 101);
+                cache().put("key3", 200);
+                cache().put("key4", 201);
 
-            tx.close();
+                tx.commit();
+            }
+            finally {
+                tx.close();
+            }
 
             tx = cache().txStart(PESSIMISTIC, REPEATABLE_READ);
 
@@ -731,27 +746,27 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
      */
     public void testPutTx() throws Exception {
         if (txEnabled()) {
-            IgniteTx tx = cache().txStart();
-
-            assert cache().put("key1", 1) == null;
-            assert cache().put("key2", 2) == null;
+            try (IgniteTx tx = cache().txStart()) {
+                assert cache().put("key1", 1) == null;
+                assert cache().put("key2", 2) == null;
 
-            // Check inside transaction.
-            assert cache().get("key1") == 1;
-            assert cache().get("key2") == 2;
+                // Check inside transaction.
+                assert cache().get("key1") == 1;
+                assert cache().get("key2") == 2;
 
-            // Put again to check returned values.
-            assert cache().put("key1", 1) == 1;
-            assert cache().put("key2", 2) == 2;
+                // Put again to check returned values.
+                assert cache().put("key1", 1) == 1;
+                assert cache().put("key2", 2) == 2;
 
-            checkContainsKey(true, "key1");
-            checkContainsKey(true, "key2");
+                checkContainsKey(true, "key1");
+                checkContainsKey(true, "key2");
 
-            assert cache().get("key1") != null;
-            assert cache().get("key2") != null;
-            assert cache().get("wrong") == null;
+                assert cache().get("key1") != null;
+                assert cache().get("key2") != null;
+                assert cache().get("wrong") == null;
 
-            tx.commit();
+                tx.commit();
+            }
 
             // Check outside transaction.
             checkContainsKey(true, "key1");
@@ -1192,11 +1207,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testPutFiltered() throws Exception {
         IgniteTx tx = txEnabled() ? cache().txStart() : null;
 
-        cache().put("key1", 1, F.<String, Integer>cacheNoPeekValue());
-        cache().put("key2", 100, gte100);
+        try {
+            cache().put("key1", 1, F.<String, Integer>cacheNoPeekValue());
+            cache().put("key2", 100, gte100);
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
 
         checkSize(F.asSet("key1"));
 
@@ -1207,7 +1228,6 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
         assert i == null : "Why not null?: " + i;
     }
 
-
     /**
      * @throws Exception In case of error.
      */
@@ -1330,11 +1350,17 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testPutAsyncFiltered() throws Exception {
         IgniteTx tx = txEnabled() ? cache().txStart() : null;
 
-        assert cache().putAsync("key1", 1, gte100).get() == null;
-        assert cache().putAsync("key2", 101, F.<String, Integer>cacheNoPeekValue()).get() == null;
+        try {
+            assert cache().putAsync("key1", 1, gte100).get() == null;
+            assert cache().putAsync("key2", 101, F.<String, Integer>cacheNoPeekValue()).get() == null;
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
 
         checkSize(F.asSet("key2"));
 
@@ -1377,16 +1403,22 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     private void checkPutx(boolean inTx) throws Exception {
         IgniteTx tx = inTx ? cache().txStart() : null;
 
-        assert cache().putx("key1", 1);
-        assert cache().putx("key2", 2);
-        assert !cache().putx("wrong", 3, gte100);
+        try {
+            assert cache().putx("key1", 1);
+            assert cache().putx("key2", 2);
+            assert !cache().putx("wrong", 3, gte100);
 
-        // Check inside transaction.
-        assert cache().get("key1") == 1;
-        assert cache().get("key2") == 2;
+            // Check inside transaction.
+            assert cache().get("key1") == 1;
+            assert cache().get("key2") == 2;
 
-        if (tx != null)
-            tx.commit();
+            if (tx != null)
+                tx.commit();
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
 
         checkSize(F.asSet("key1", "key2"));
 
@@ -1611,25 +1643,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
     public void testPutxAsync() throws Exception {
         IgniteTx tx = txEnabled() ? cache().txStart() : null;
 
-        cache().put("key2", 1);
+        try {
+            cache().put("key2", 1);
 
-        IgniteFuture<Boolean> fut1 = cache().putxAsync("key1", 10);
-        IgniteFuture<Boolean> fut2 = cache().putxAsync("key2", 11);
+            IgniteFuture<Boolean> fut1 = cache().putxAsync("key1", 10);
+            IgniteFuture<Boolean> fut2 = cache().putxAsync("key2", 11);
 
-        IgniteFuture<IgniteTx> f = null;
+            IgniteFuture<IgniteTx> f = null;
 
-        if (tx != null) {
-            tx = (IgniteTx)tx.enableAsync();
+            if (tx != null) {
+                tx = (IgniteTx)tx.enableAsync();
 
-            tx.commit();
+                tx.commit();
 
-            f = tx.future();
-        }
+                f = tx.future();
+            }
 
-        assert fut1.get();
-        assert fut2.get();
+            assert fut1.get();
+            assert fut2.get();
 
-        assert f == null || f.get().state() == COMMITTED;
+            assert f == null || f.get().state() == COMMITTED;
+        }
+        finally {
+            if (tx != null)
+                tx.close();
+        }
 
         checkSize(F.asSet("key1", "key2"));
 
@@ -2017,6 +2055,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
         assertFalse(cache().putxIfAbsentAsync("key3", 4).get());
 
+        assertEquals((Integer)3, cache().get("key3"));
+
         cache().evict("key2");
         cache().clear("key3");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0354a41b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
index 466e178..1111437 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java
@@ -55,7 +55,7 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract
      * @return Expected number of inner reads.
      */
     protected int expectedReadsPerPut(boolean isPrimary) {
-        return isPrimary ? 1 : 2;
+        return 1;
     }
 
     /**
@@ -65,7 +65,7 @@ public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstract
      * @return Expected number of misses.
      */
     protected int expectedMissesPerPut(boolean isPrimary) {
-        return isPrimary ? 1 : 2;
+        return 1;
     }
 
     /** {@inheritDoc} */


Mime
View raw message