ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [04/19] ignite git commit: Fixes: - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction - fixed 'full_sync' mode for case when tx primary nodes fail - fixed race between statically configured cache start and
Date Mon, 18 Jan 2016 08:25:26 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 291c88a..1b40d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -64,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
 
     /** */
+    public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -122,22 +129,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public boolean onNodeLeft(UUID nodeId) {
+        boolean found = false;
+
         for (IgniteInternalFuture<?> fut : futures())
             if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+                MinFuture f = (MinFuture)fut;
 
-                if (f.node().id().equals(nodeId)) {
+                if (f.onNodeLeft(nodeId)) {
                     // Remove previous mapping.
                     mappings.remove(nodeId);
 
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
-
-                    return true;
+                    found = true;
                 }
             }
 
-        return false;
+        return found;
     }
 
     /** {@inheritDoc} */
@@ -156,19 +164,32 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param nodeId Sender.
      * @param res Result.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
-        if (!isDone())
-            for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+        if (!isDone()) {
+            FinishMiniFuture finishFut = null;
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
+            synchronized (futs) {
+                for (int i = 0; i < futs.size(); i++) {
+                    IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+                    if (fut.getClass() == FinishMiniFuture.class) {
+                        FinishMiniFuture f = (FinishMiniFuture)fut;
 
-                        f.onResult(res);
+                        if (f.futureId().equals(res.miniId())) {
+                            assert f.node().id().equals(nodeId);
+
+                            finishFut = f;
+
+                            break;
+                        }
                     }
                 }
             }
+
+            if (finishFut != null)
+                finishFut.onNearFinishResponse(res);
+        }
     }
 
     /**
@@ -178,15 +199,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
         if (!isDone())
             for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+                if (fut.getClass() == CheckBackupMiniFuture.class) {
+                    CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId())) {
                         assert f.node().id().equals(nodeId);
 
-                        f.onResult(res);
+                        f.onDhtFinishResponse(res);
                     }
                 }
+                else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+                    CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId()))
+                        f.onDhtFinishResponse(nodeId);
+                }
             }
     }
 
@@ -204,9 +231,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 boolean marked = tx.setRollbackOnly();
 
-                if (err instanceof NodeStoppingException)
-                    return super.onDone(null, err);
-
                 if (err instanceof IgniteTxRollbackCheckedException) {
                     if (marked) {
                         try {
@@ -289,11 +313,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     * @param f Future.
+     * @param fut Future.
      * @return {@code True} if mini-future.
      */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
+    private boolean isMini(IgniteInternalFuture<?> fut) {
+        return fut.getClass() == FinishMiniFuture.class ||
+            fut.getClass() == CheckBackupMiniFuture.class ||
+            fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
     /**
@@ -393,7 +419,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 ClusterNode backup = cctx.discovery().node(backupId);
 
-                MiniFuture mini = new MiniFuture(backup, mapping);
+                final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
 
                 add(mini);
 
@@ -414,8 +440,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                     readyNearMappingFromBackup(mapping);
 
-                    if (committed)
+                    if (committed) {
+                        if (tx.syncCommit()) {
+                            GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+                            assert nearXidVer != null : tx;
+
+                            IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+                            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    mini.onDone(tx);
+                                }
+                            });
+
+                            return;
+                        }
+
                         mini.onDone(tx);
+                    }
                     else {
                         ClusterTopologyCheckedException cause =
                             new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
@@ -427,46 +470,26 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                     }
                 }
                 else {
-                    GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
-                        cctx.localNodeId(),
-                        futureId(),
-                        mini.futureId(),
-                        tx.topologyVersion(),
-                        tx.xidVersion(),
-                        tx.commitVersion(),
-                        tx.threadId(),
-                        tx.isolation(),
-                        true,
-                        false,
-                        tx.system(),
-                        tx.ioPolicy(),
-                        false,
-                        true,
-                        true,
-                        null,
-                        null,
-                        null,
-                        null,
-                        0,
-                        null,
-                        0,
-                        tx.activeCachesDeploymentEnabled());
-
-                    finishReq.checkCommitted(true);
+                    GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+
+                    // Preserve old behavior, otherwise response is not sent.
+                    if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
+                        finishReq.syncCommit(true);
 
                     try {
                         if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
                             cctx.io().send(backup, finishReq, tx.ioPolicy());
-                        else
+                        else {
                             mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
                                 "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
                                 ", ver=" + backup.version() + ']'));
+                        }
                     }
                     catch (ClusterTopologyCheckedException e) {
-                        mini.onResult(e);
+                        mini.onNodeLeft(backupId);
                     }
                     catch (IgniteCheckedException e) {
-                        mini.onResult(e);
+                        mini.onDone(e);
                     }
                 }
             }
@@ -476,7 +499,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     *
+     * @return {@code True} if need to send finish request for one phase commit transaction.
      */
     private boolean needFinishOnePhase() {
         if (tx.mappings().empty())
@@ -584,7 +607,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 add(fut);
         }
         else {
-            MiniFuture fut = new MiniFuture(m);
+            FinishMiniFuture fut = new FinishMiniFuture(m);
 
             req.miniId(fut.futureId());
 
@@ -604,11 +627,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Remove previous mapping.
                 mappings.remove(m.node().id());
 
-                fut.onResult(e);
+                fut.onNodeLeft(n.id());
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
-                fut.onResult(e);
+                fut.onDone(e);
             }
         }
     }
@@ -618,10 +641,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
             @SuppressWarnings("unchecked")
             @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    MiniFuture m = (MiniFuture)f;
+                if (f.getClass() == FinishMiniFuture.class) {
+                    FinishMiniFuture fut = (FinishMiniFuture)f;
+
+                    return "FinishFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + fut.isDone() + "]";
+                }
+                else if (f.getClass() == CheckBackupMiniFuture.class) {
+                    CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+                    return "CheckBackupFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
+                    CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+                    return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
                 }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
@@ -634,108 +671,217 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
+     * @param miniId Mini future ID.
+     * @return Finish request.
+     */
+    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+        GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+            cctx.localNodeId(),
+            futureId(),
+            miniId,
+            tx.topologyVersion(),
+            tx.xidVersion(),
+            tx.commitVersion(),
+            tx.threadId(),
+            tx.isolation(),
+            true,
+            false,
+            tx.system(),
+            tx.ioPolicy(),
+            false,
+            tx.syncCommit(),
+            tx.syncRollback(),
+            null,
+            null,
+            null,
+            null,
+            0,
+            null,
+            0,
+            tx.activeCachesDeploymentEnabled());
+
+        finishReq.checkCommitted(true);
+
+        return finishReq;
+    }
+
+    /**
+     *
+     */
+    private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /**
+          * @param nodeId Node ID.
+         * @return {@code True} if future processed node failure.
+         */
+        abstract boolean onNodeLeft(UUID nodeId);
+
+        /**
+         * @return Future ID.
+         */
+        final IgniteUuid futureId() {
+            return futId;
+        }
+    }
+
+    /**
      * Mini-future for get operations. Mini-futures are only waiting on a single
      * node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class FinishMiniFuture extends MinFuture {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
         /** Keys. */
         @GridToStringInclude
         private GridDistributedTxMapping m;
 
-        /** Backup check flag. */
-        private ClusterNode backup;
-
         /**
          * @param m Mapping.
          */
-        MiniFuture(GridDistributedTxMapping m) {
+        FinishMiniFuture(GridDistributedTxMapping m) {
             this.m = m;
         }
 
         /**
-         * @param backup Backup to check.
-         * @param m Mapping associated with the backup.
+         * @return Node ID.
          */
-        MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
-            this.backup = backup;
-            this.m = m;
+        ClusterNode node() {
+            return m.node();
         }
 
         /**
-         * @return Future ID.
+         * @return Keys.
          */
-        IgniteUuid futureId() {
-            return futId;
+        public GridDistributedTxMapping mapping() {
+            return m;
         }
 
         /**
-         * @return Node ID.
+         * @param nodeId Failed node ID.
          */
-        public ClusterNode node() {
-            assert m != null || backup != null;
+        boolean onNodeLeft(UUID nodeId) {
+            if (nodeId.equals(m.node().id())) {
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+                if (isSync()) {
+                    Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+                    if (txNodes != null) {
+                        Collection<UUID> backups = txNodes.get(nodeId);
+
+                        if (!F.isEmpty(backups)) {
+                            final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+                            add(mini);
+
+                            GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
+
+                            req.waitRemoteTransactions(true);
+
+                            for (UUID backupId : backups) {
+                                ClusterNode backup = cctx.discovery().node(backupId);
+
+                                if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+                                    if (backup.isLocal()) {
+                                        IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+                                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                                mini.onDhtFinishResponse(cctx.localNodeId());
+                                            }
+                                        });
+                                    }
+                                    else {
+                                        try {
+                                            cctx.io().send(backup, req, tx.ioPolicy());
+                                        }
+                                        catch (ClusterTopologyCheckedException e) {
+                                            mini.onNodeLeft(backupId);
+                                        }
+                                        catch (IgniteCheckedException e) {
+                                            mini.onDone(e);
+                                        }
+                                    }
+                                }
+                                else
+                                    mini.onDhtFinishResponse(backupId);
+                            }
+                        }
+                    }
+                }
 
-            return backup != null ? backup : m.node();
+                onDone(tx);
+
+                return true;
+            }
+
+            return false;
         }
 
         /**
-         * @return Keys.
+         * @param res Result callback.
          */
-        public GridDistributedTxMapping mapping() {
-            return m;
+        void onNearFinishResponse(GridNearTxFinishResponse res) {
+            if (res.error() != null)
+                onDone(res.error());
+            else
+                onDone(tx);
         }
 
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+        }
+    }
+
+    /**
+     *
+     */
+    private class CheckBackupMiniFuture extends MinFuture {
+        /** Keys. */
+        @GridToStringInclude
+        private GridDistributedTxMapping m;
+
+        /** Backup node to check. */
+        private ClusterNode backup;
+
         /**
-         * @param e Error.
+         * @param backup Backup to check.
+         * @param m Mapping associated with the backup.
          */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
+        CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+            this.backup = backup;
+            this.m = m;
         }
 
         /**
-         * @param e Node failure.
+         * @return Node ID.
          */
-        void onResult(ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+        public ClusterNode node() {
+            return backup;
+        }
 
-            if (backup != null) {
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId) {
+            if (nodeId.equals(backup.id())) {
                 readyNearMappingFromBackup(m);
 
-                onDone(e);
-            }
-            else
-                // Complete future with tx.
-                onDone(tx);
-        }
+                onDone(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
 
-        /**
-         * @param res Result callback.
-         */
-        void onResult(GridNearTxFinishResponse res) {
-            assert backup == null;
+                return true;
+            }
 
-            if (res.error() != null)
-                onDone(res.error());
-            else
-                onDone(tx);
+            return false;
         }
 
         /**
          * @param res Response.
          */
-        void onResult(GridDhtTxFinishResponse res) {
-            assert backup != null;
-
+        void onDhtFinishResponse(GridDhtTxFinishResponse res) {
             readyNearMappingFromBackup(m);
 
             Throwable err = res.checkCommittedError();
@@ -755,9 +901,67 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 onDone(tx);
         }
 
+    }
+
+    /**
+     *
+     */
+    private class CheckRemoteTxMiniFuture extends MinFuture {
+        /** */
+        private Set<UUID> nodes;
+
+        /**
+         * @param nodes Backup nodes.
+         */
+        public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+            this.nodes = nodes;
+        }
+
+        /**
+         * @return Backup nodes.
+         */
+        Set<UUID> nodes() {
+            synchronized (this) {
+                return new HashSet<>(nodes);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId) {
+            return onResponse(nodeId);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        void onDhtFinishResponse(UUID nodeId) {
+            onResponse(nodeId);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return {@code True} if processed node response.
+         */
+        private boolean onResponse(UUID nodeId) {
+            boolean done;
+
+            boolean ret;
+
+            synchronized (this) {
+                ret = nodes.remove(nodeId);
+
+                done = nodes.isEmpty();
+            }
+
+            if (done)
+                onDone(tx);
+
+            return ret;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+            return S.toString(CheckRemoteTxMiniFuture.class, this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 3e5e28f..65eac63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -70,6 +70,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
+     * @param syncCommit Sync commit flag.
+     * @param syncRollback Sync rollback flag.
      * @param explicitLock Explicit lock flag.
      * @param storeEnabled Store enabled flag.
      * @param topVer Topology version.
@@ -77,6 +80,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
      * @param addDepInfo Deployment info flag.
      */
     public GridNearTxFinishRequest(

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4904ad8..b84d2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -99,7 +99,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 
@@ -107,7 +107,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index aa4e929f..b7b480e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -202,7 +202,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheVersion nearXidVersion() {
+    @Override public GridCacheVersion nearXidVersion() {
         return xidVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d886243..8812709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -264,7 +264,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (ownedVals != null) {
+        if (ownedVals != null && ownedValKeys == null) {
             ownedValKeys = ownedVals.keySet();
 
             ownedValVals = ownedVals.values();
@@ -287,7 +287,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         }
 
         if (filterFailedKeys != null) {
-            for (IgniteTxKey key :filterFailedKeys) {
+            for (IgniteTxKey key : filterFailedKeys) {
                 GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 
                 key.prepareMarshal(cctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 59d8b5b..dc98eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -280,28 +280,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        if (keyValFilter != null) {
+        if (keyValFilter != null && keyValFilterBytes == null) {
             if (addDepInfo)
                 prepareObject(keyValFilter, cctx);
 
             keyValFilterBytes = CU.marshal(cctx, keyValFilter);
         }
 
-        if (rdc != null) {
+        if (rdc != null && rdcBytes == null) {
             if (addDepInfo)
                 prepareObject(rdc, cctx);
 
             rdcBytes = CU.marshal(cctx, rdc);
         }
 
-        if (trans != null) {
+        if (trans != null && transBytes == null) {
             if (addDepInfo)
                 prepareObject(trans, cctx);
 
             transBytes = CU.marshal(cctx, trans);
         }
 
-        if (!F.isEmpty(args)) {
+        if (!F.isEmpty(args) && argsBytes == null) {
             if (addDepInfo) {
                 for (Object arg : args)
                     prepareObject(arg, cctx);
@@ -317,16 +317,16 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
         Marshaller mrsh = ctx.marshaller();
 
-        if (keyValFilterBytes != null)
+        if (keyValFilterBytes != null && keyValFilter == null)
             keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr);
 
-        if (rdcBytes != null)
+        if (rdcBytes != null && rdc == null)
             rdc = mrsh.unmarshal(rdcBytes, ldr);
 
-        if (transBytes != null)
+        if (transBytes != null && trans == null)
             trans = mrsh.unmarshal(transBytes, ldr);
 
-        if (argsBytes != null)
+        if (argsBytes != null && args == null)
             args = mrsh.unmarshal(argsBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index cce465b..ab882d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -122,11 +122,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
 
-        metaDataBytes = marshalCollection(metadata, cctx);
-        dataBytes = marshalCollection(data, cctx);
+        if (metaDataBytes == null)
+            metaDataBytes = marshalCollection(metadata, cctx);
+
+        if (dataBytes == null)
+            dataBytes = marshalCollection(data, cctx);
 
         if (addDepInfo && !F.isEmpty(data)) {
             for (Object o : data) {
@@ -144,11 +147,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
 
-        metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
-        data = unmarshalCollection(dataBytes, ctx, ldr);
+        if (metadata == null)
+            metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
+
+        if (data == null)
+            data = unmarshalCollection(dataBytes, ctx, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..914b4ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -160,6 +160,12 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public long timeout(long timeout);
 
     /**
+     * Changes transaction state from COMMITTING to MARKED_ROLLBACK.
+     * Must be called only from thread committing transaction.
+     */
+    public void errorWhenCommitting();
+
+    /**
      * Modify the transaction associated with the current thread such that the
      * only possible outcome of the transaction is to roll back the
      * transaction.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 22e27c3..ed44c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -804,6 +804,22 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
+    public final void errorWhenCommitting() {
+        synchronized (this) {
+            TransactionState prev = state;
+
+            assert prev == COMMITTING : prev;
+
+            state = MARKED_ROLLBACK;
+
+            if (log.isDebugEnabled())
+                log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
+
+            notifyAll();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean setRollbackOnly() {
         return state(MARKED_ROLLBACK);
     }
@@ -1083,7 +1099,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                 }
 
                 case MARKED_ROLLBACK: {
-                    valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
+                    valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
 
                     break;
                 }
@@ -1705,6 +1721,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Override public void errorWhenCommitting() {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Override public void commit() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index c42bc7f..f731975 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -828,7 +828,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         val.marshal(ctx, context());
 
-        expiryPlcBytes = transferExpiryPlc ?  CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)) : null;
+        if (transferExpiryPlc) {
+            if (expiryPlcBytes == null)
+                expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+        }
+        else
+            expiryPlcBytes = null;
     }
 
     /**
@@ -871,8 +876,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         val.unmarshal(this.ctx, clsLdr);
 
-        if (expiryPlcBytes != null)
-            expiryPlc =  ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..547c018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -721,14 +721,12 @@ public class IgniteTxHandler {
 
             IgniteInternalFuture<IgniteInternalTx> res = null;
 
-            if (tx != null) {
-                IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+            IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
 
-                // Only for error logging.
-                rollbackFut.listen(CU.errorLogger(log));
+            // Only for error logging.
+            rollbackFut.listen(CU.errorLogger(log));
 
-                res = rollbackFut;
-            }
+            res = rollbackFut;
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -875,7 +873,19 @@ public class IgniteTxHandler {
             log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
 
         if (req.checkCommitted()) {
-            sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
+            boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
+
+            if (!committed || !req.syncCommit())
+                sendReply(nodeId, req, committed);
+            else {
+                IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
+
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut) {
+                        sendReply(nodeId, req, true);
+                    }
+                });
+            }
 
             return;
         }
@@ -1044,7 +1054,7 @@ public class IgniteTxHandler {
      * @param committed {@code True} if transaction committed on this node.
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
-        if (req.replyRequired()) {
+        if (req.replyRequired() || req.checkCommitted()) {
             GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
 
             if (req.checkCommitted()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 21ff0cf..926eaf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,7 +809,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             catch (IgniteCheckedException ex) {
                 commitError(ex);
 
-                setRollbackOnly();
+                errorWhenCommitting();
 
                 // Safe to remove transaction from committed tx list because nothing was committed yet.
                 cctx.tm().removeCommittedTx(this);
@@ -819,7 +819,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             catch (Throwable ex) {
                 commitError(ex);
 
-                setRollbackOnly();
+                errorWhenCommitting();
 
                 // Safe to remove transaction from committed tx list because nothing was committed yet.
                 cctx.tm().removeCommittedTx(this);
@@ -1161,7 +1161,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             // Set operation to NOOP.
                             txEntry.op(NOOP);
 
-                            setRollbackOnly();
+                            errorWhenCommitting();
 
                             throw ex;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d384e4e..ca15e20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1063,6 +1063,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
             uncommitTx(tx);
 
+            tx.errorWhenCommitting();
+
             throw new IgniteException("Missing commit version (consider increasing " +
                 IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
                 ", tx=" + tx.getClass().getSimpleName() + ']');
@@ -1616,6 +1618,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nearVer Near version.
+     * @return Finish future for related remote transactions.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
+        GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+
+        for (final IgniteInternalTx tx : txs()) {
+            if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
+                fut.add((IgniteInternalFuture) tx.finishFuture());
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
      * @param nearVer Near version ID.
      * @param txNum Number of transactions.
      * @param fut Result future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 3d65304..77c802d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -105,6 +105,7 @@ public class DataStreamerRequest implements Message {
      * @param entries Entries to put.
      * @param ignoreDepOwnership Ignore ownership.
      * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
      * @param depMode Deployment mode.
      * @param sampleClsName Sample class name.
      * @param userVer User version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index cd783e4..98848ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -926,8 +926,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName);
 
-        if (ctx.cache().cache(cacheName) == null)
-            ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get();
+        if (ctx.cache().cache(cacheName) == null) {
+            ctx.cache().dynamicStartCache(newCfg,
+                cacheName,
+                null,
+                CacheType.INTERNAL,
+                false,
+                true,
+                true).get();
+        }
 
         assert ctx.cache().cache(cacheName) != null : cacheName;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index f4a8fad..ecb892e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -90,7 +90,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
     @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
         super.prepareMarshal(marsh);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = marsh.marshal(err);
     }
 
@@ -98,7 +98,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
     @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(marsh, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = marsh.unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 65dca08..a89913f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -357,7 +357,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
 
             switch (cmd) {
                 case DESTROY_CACHE: {
-                    fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName).chain(
+                    // Do not check thread tx here since there can be active system cache txs.
+                    fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName, false).chain(
                         new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
                             @Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
                                 throws IgniteCheckedException {
@@ -369,7 +370,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
                 }
 
                 case GET_OR_CREATE_CACHE: {
-                    fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName).chain(
+                    // Do not check thread tx here since there can be active system cache txs.
+                    fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName, false).chain(
                         new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
                             @Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
                                 throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 1ea5014..8c23d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -104,6 +104,7 @@ public interface DiscoverySpi extends IgniteSpi {
      * Sets a handler for initial data exchange between Ignite nodes.
      *
      * @param exchange Discovery data exchange handler.
+     * @return {@code this} for chaining.
      */
     public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
 
@@ -113,6 +114,7 @@ public interface DiscoverySpi extends IgniteSpi {
      * dynamic metrics between nodes.
      *
      * @param metricsProvider Provider of metrics data.
+     * @return {@code this} for chaining.
      */
     public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 066a5fd..21204c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -204,7 +204,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * Stops streamer.
      */
     public void stop() {
-        srv.stop();
+        if (srv != null)
+            srv.stop();
 
         if (log.isDebugEnabled())
             log.debug("Socket streaming server stopped");

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
index 95ca9b5..9908b87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
@@ -324,10 +324,14 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
             @Override public void applyx(IgniteCache<String, Integer> cache) {
                 int rnd = random();
 
+                Set<Integer> ids = new HashSet<>(set);
+
                 cache.removeAll(rangeKeys(0, rnd));
 
-                for (int i = 0; i < rnd; i++)
-                    assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+                for (int i = 0; i < rnd; i++) {
+                    if (ids.contains(i))
+                        assertNull(cache.localPeek("key" + i));
+                }
             }
         });
     }
@@ -350,7 +354,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
 
                 for (int i = 0; i < rnd; i++) {
                     if (ids.contains(i))
-                        assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+                        assertNull(cache.localPeek("key" + i));
                 }
             }
         });
@@ -359,6 +363,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
     /**
      * @param cache Cache.
      * @param key Key.
+     * @return Removed value.
      */
     private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
         IgniteCache<K, V> cacheAsync = cache.withAsync();
@@ -371,6 +376,8 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
     /**
      * @param cache Cache.
      * @param key Key.
+     * @param val Value.
+     * @return Remove result.
      */
     private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
         IgniteCache<K, V> cacheAsync = cache.withAsync();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0d9c541..1e0071e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4337,7 +4337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
                 log.info("Set iterators not cleared, will wait");
 
-                Thread.sleep(500);
+                Thread.sleep(1000);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
index a34857f..e70c97b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
@@ -299,7 +299,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest {
 
                         return null;
                     }
-                }));
+                }, "cache-thread"));
             }
 
             readyLatch.await();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 8a5dfd4..c9cd750 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -158,7 +158,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
                 ccfg.setName(DYNAMIC_CACHE_NAME);
 
-                futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+                futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+                    ccfg.getName(),
+                    null,
+                    true,
+                    true,
+                    true));
 
                 return null;
             }
@@ -190,7 +195,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
 
                 return null;
             }
@@ -218,7 +223,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
                 IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+                futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+                    ccfg.getName(),
+                    null,
+                    true,
+                    true,
+                    true));
 
                 return null;
             }
@@ -252,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
 
                 return null;
             }
@@ -315,7 +325,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
         for (int g = 0; g < nodeCount(); g++)
             caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME);
 
-        kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+        kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
         for (int g = 0; g < nodeCount(); g++) {
             final IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -368,7 +378,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             }
 
             // Undeploy cache.
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
             startGrid(nodeCount() + 1);
 
@@ -445,7 +455,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
                     }, IllegalArgumentException.class, null);
             }
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
             stopGrid(nodeCount() + 1);
             stopGrid(nodeCount());
@@ -512,7 +522,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());
@@ -554,7 +564,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());
@@ -600,7 +610,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index bf6dcda..34e7080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,6 +60,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 84838db..a08d080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -188,6 +188,9 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param conc Transaction concurrency.
+     * @param backup Check backup flag.
+     * @param commit Check commit flag.
      * @throws Exception If failed.
      */
     private void checkPrimaryNodeFailureBackupCommit(
@@ -197,6 +200,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     ) throws Exception {
         try {
             startGrids(gridCount());
+
             awaitPartitionMapExchange();
 
             for (int i = 0; i < gridCount(); i++)
@@ -290,7 +294,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
 
                     return null;
                 }
-            });
+            }, "tx-thread");
 
             commitLatch.await();
 
@@ -366,6 +370,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
 
     /**
      * @param ignite Ignite instance to generate key.
+     * @param backup Backup key flag.
      * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for
      *      {@code ignite(1)}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
new file mode 100644
index 0000000..c47401c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheCommitDelayTxRecoveryTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static volatile boolean commit;
+
+    /** */
+    private static volatile CountDownLatch commitStartedLatch;
+
+    /** */
+    private static volatile CountDownLatch commitFinishLatch;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecovery1() throws Exception {
+        checkRecovery(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecovery2() throws Exception {
+        checkRecovery(2, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecoveryStoreEnabled1() throws Exception {
+        checkRecovery(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecoveryStoreEnabled2() throws Exception {
+        checkRecovery(2, true);
+    }
+
+    /**
+     * @param backups Number of cache backups.
+     * @param useStore If {@code true} tests cache with store configured.
+     * @throws Exception If failed.
+     */
+    private void checkRecovery(int backups, boolean useStore) throws Exception {
+        startGridsMultiThreaded(SRVS, false);
+
+        client = true;
+
+        Ignite clientNode = startGrid(SRVS);
+
+        assertTrue(clientNode.configuration().isClientMode());
+
+        client = false;
+
+        clientNode.createCache(cacheConfiguration(backups, useStore));
+
+        awaitPartitionMapExchange();
+
+        Ignite srv = ignite(0);
+
+        assertFalse(srv.configuration().isClientMode());
+
+        for (Boolean pessimistic : Arrays.asList(false, true)) {
+            checkRecovery(backupKey(srv.cache(null)), srv, pessimistic, useStore);
+
+            checkRecovery(nearKey(srv.cache(null)), srv, pessimistic, useStore);
+
+            checkRecovery(nearKey(clientNode.cache(null)), clientNode, pessimistic, useStore);
+
+            srv = ignite(0);
+
+            assertFalse(srv.configuration().isClientMode());
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param ignite Node executing update.
+     * @param pessimistic If {@code true} uses pessimistic transaction.
+     * @param useStore {@code True} if store is used.
+     * @throws Exception If failed.
+     */
+    private void checkRecovery(final Integer key,
+        final Ignite ignite,
+        final boolean pessimistic,
+        final boolean useStore) throws Exception {
+        Ignite primary = primaryNode(key, null);
+
+        assertNotSame(ignite, primary);
+
+        List<Ignite> backups = backupNodes(key, null);
+
+        assertFalse(backups.isEmpty());
+
+        final Set<String> backupNames = new HashSet<>();
+
+        for (Ignite node : backups)
+            backupNames.add(node.name());
+
+        log.info("Check recovery [key=" + key +
+            ", pessimistic=" + pessimistic +
+            ", primary=" + primary.name() +
+            ", backups=" + backupNames +
+            ", node=" + ignite.name() + ']');
+
+        final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+        cache.put(key, 0);
+
+        commitStartedLatch = new CountDownLatch(backupNames.size());
+        commitFinishLatch = new CountDownLatch(1);
+
+        commit = false;
+
+        TestEntryProcessor.skipFirst = useStore ? ignite.name() : null;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start update.");
+
+                if (pessimistic) {
+                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.invoke(key, new TestEntryProcessor(backupNames));
+
+                        commit = true;
+
+                        log.info("Start commit.");
+
+                        assertEquals(backupNames.size(), commitStartedLatch.getCount());
+
+                        tx.commit();
+                    }
+                }
+                else {
+                    commit = true;
+
+                    cache.invoke(key, new TestEntryProcessor(backupNames));
+                }
+
+                log.info("End update, execute get.");
+
+                Integer val = cache.get(key);
+
+                log.info("Get value: " + val);
+
+                assertEquals(1, (Object)val);
+
+                return null;
+            }
+        }, "update-thread");
+
+        assertTrue(commitStartedLatch.await(30, SECONDS));
+
+        log.info("Stop node: " + primary.name());
+
+        primary.close();
+
+        commitFinishLatch.countDown();
+
+        fut.get();
+
+        for (Ignite node : G.allGrids())
+            assertEquals(1, node.cache(null).get(key));
+
+        cache.put(key, 2);
+
+        for (Ignite node : G.allGrids())
+            assertEquals(2, node.cache(null).get(key));
+
+        startGrid(primary.name());
+
+        for (Ignite node : G.allGrids())
+            assertEquals(2, node.cache(null).get(key));
+
+        cache.put(key, 3);
+
+        for (Ignite node : G.allGrids())
+            assertEquals(3, node.cache(null).get(key));
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> {
+        /** */
+        private Set<String> nodeNames;
+
+        /** Skips first call for given node (used to skip call for store update). */
+        private static String skipFirst;
+
+        /**
+         * @param nodeNames Node names where sleep will be called.
+         */
+        public TestEntryProcessor(Set<String> nodeNames) {
+            this.nodeNames = nodeNames;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) {
+            Ignite ignite = entry.unwrap(Ignite.class);
+
+            System.out.println(Thread.currentThread().getName() + " process [node=" + ignite.name() +
+                ", commit=" + commit + ", skipFirst=" + skipFirst + ']');
+
+            boolean skip = false;
+
+            if (commit && ignite.name().equals(skipFirst)) {
+                skipFirst = null;
+
+                skip = true;
+            }
+
+            if (!skip && commit && nodeNames.contains(ignite.name())) {
+                try {
+                    System.out.println(Thread.currentThread().getName() + " start process invoke.");
+
+                    assertTrue(commitStartedLatch != null && commitStartedLatch.getCount() > 0);
+
+                    commitStartedLatch.countDown();
+
+                    assertTrue(commitFinishLatch.await(10, SECONDS));
+
+                    System.out.println(Thread.currentThread().getName() + " end process invoke.");
+                }
+                catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            else
+                System.out.println(Thread.currentThread().getName() + " invoke set value.");
+
+            entry.setValue(1);
+
+            return null;
+        }
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param useStore If {@code true} adds cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(int backups, boolean useStore) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(backups);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+
+        if (useStore) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter<Object, Object>() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 4eb8a6b..7532354 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -30,6 +30,8 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -238,7 +240,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
      * @param store If {@code true} uses cache with store.
      * @throws Exception If failed.
      */
-    private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+    protected final void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
         ignite(0).createCache(cacheConfiguration(memMode, store));
 
         final AtomicBoolean finished = new AtomicBoolean();
@@ -259,7 +261,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
             }
         });
 
-        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+        final IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
 
         int iter = 0;
 
@@ -309,6 +311,31 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
                     break;
                 }
 
+                case TX_PUT: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        final Integer val = ++iter;
+
+                        Ignite ignite = ignite(0);
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            final Integer key = i;
+
+                            doInTransaction(ignite, new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    cache.put(key, val);
+
+                                    return null;
+                                }
+                            });
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
                 case PUT_ALL: {
                     while (System.currentTimeMillis() < stopTime) {
                         Integer val = ++iter;
@@ -541,7 +568,10 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         INVOKE,
 
         /** */
-        INVOKE_ALL
+        INVOKE_ALL,
+
+        /** */
+        TX_PUT
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 7655464..9204bc8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -101,6 +101,20 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     /**
      * @throws Exception If failed.
      */
+    public void testExplicitTransactionRetriesSingleValue() throws Exception {
+        checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitTransactionRetriesSingleValueStoreEnabled() throws Exception {
+        checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testExplicitTransactionRetries() throws Exception {
         explicitTransactionRetries(TestMemoryMode.HEAP, false);
     }
@@ -108,6 +122,13 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     /**
      * @throws Exception If failed.
      */
+    public void testExplicitTransactionRetriesSingleOperation() throws Exception {
+        explicitTransactionRetries(TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
         explicitTransactionRetries(TestMemoryMode.HEAP, true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index d239ea8..91eecbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -104,6 +105,8 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(disc);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         if (include)
             cfg.setUserAttributes(F.asMap("include", true));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 6089795..552dd28 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -569,9 +569,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
         GridNioServerListener lsnr,
         @Nullable Integer queueLimit) throws Exception {
         for (int i = 0; i < 10; i++) {
-            try {
-                int srvPort = port++;
+            int srvPort = port++;
 
+            try {
                 GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr);
 
                 if (queueLimit != null)
@@ -584,8 +584,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
                 return srvr;
             }
             catch (IgniteCheckedException e) {
-                if (i < 9 && e.hasCause(BindException.class))
-                    log.error("Failed to start server, will try another port: " + e);
+                if (i < 9 && e.hasCause(BindException.class)) {
+                    log.error("Failed to start server, will try another port [err=" + e + ", port=" + srvPort + ']');
+
+                    U.sleep(5000);
+                }
                 else
                     throw e;
             }


Mime
View raw message