ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [4/7] ignite git commit: ignite-5272 Do not always disable one-phase commit if there is near cache, instead switch to non one-phase mode if there are readers.
Date Tue, 13 Jun 2017 02:33:36 GMT
ignite-5272 Do not always disable one-phase commit if there is near cache, instead switch to non one-phase mode if there are readers.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca6cd142
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca6cd142
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca6cd142

Branch: refs/heads/ignite-5460
Commit: ca6cd142d43a47399c51a586d43c8eab6f82163d
Parents: 851b9ad
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jun 12 10:44:28 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jun 12 10:44:28 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |   3 +-
 .../processors/cache/GridCacheIoManager.java    |   1 +
 .../GridDistributedTxPrepareRequest.java        |  14 +-
 .../GridDistributedTxPrepareResponse.java       |   2 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  21 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  16 +-
 .../distributed/dht/GridDhtGetSingleFuture.java |  10 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   3 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   2 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   8 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  11 +
 .../dht/GridPartitionedGetFuture.java           |   2 +
 .../dht/GridPartitionedSingleGetFuture.java     |  17 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  43 +--
 .../distributed/near/GridNearGetFuture.java     |   4 +-
 .../distributed/near/GridNearGetRequest.java    | 145 ++++------
 .../distributed/near/GridNearLockFuture.java    |   1 +
 .../distributed/near/GridNearLockRequest.java   |  13 +-
 .../near/GridNearTxFinishFuture.java            |  11 +-
 .../cache/distributed/near/GridNearTxLocal.java |  15 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   5 +-
 .../near/GridNearTxPrepareResponse.java         |  14 +
 .../cache/transactions/IgniteTxEntry.java       |  22 +-
 .../cache/transactions/IgniteTxHandler.java     |   2 +
 .../IgniteTxImplicitSingleStateImpl.java        |  11 -
 .../transactions/IgniteTxLocalAdapter.java      |   6 +-
 .../cache/transactions/IgniteTxLocalState.java  |  10 -
 .../IgniteTxRemoteStateAdapter.java             |   7 -
 .../cache/transactions/IgniteTxState.java       |   6 -
 .../cache/transactions/IgniteTxStateImpl.java   |  32 ---
 .../IgniteOnePhaseCommitNearReadersTest.java    | 270 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../testsuites/IgniteCacheTestSuite3.java       |   4 +-
 33 files changed, 503 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index dc4a91f..b6fe90b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -608,7 +608,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
         throws IgniteCheckedException {
         assert affCalcVer != null || cctx.kernalContext().clientNode();
         assert msg.topologyVersion() != null && msg.exchangeId() == null : msg;
-        assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion());
+        assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion()) :
+            "Invalid version [affCalcVer=" + affCalcVer + ", msg=" + msg + ']';
 
         final AffinityTopologyVersion topVer = exchFut.topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 946d256..37e495e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -740,6 +740,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     null,
                     null,
                     null,
+                    false,
                     req.deployInfo() != null);
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 4b11414f..3205c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -689,15 +689,19 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         StringBuilder flags = new StringBuilder();
 
         if (needReturnValue())
-            flags.append("retVal");
+            appendFlag(flags, "retVal");
+
         if (isInvalidate())
-            flags.append("invalidate");
+            appendFlag(flags, "invalidate");
+
         if (onePhaseCommit())
-            flags.append("onePhase");
+            appendFlag(flags, "onePhase");
+
         if (last())
-            flags.append("last");
+            appendFlag(flags, "last");
+
         if (system())
-            flags.append("sys");
+            appendFlag(flags, "sys");
 
         return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
             "flags", flags.toString(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 7cd3ad8..58e9492 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -55,7 +55,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     private int part;
 
     /** */
-    private byte flags;
+    protected byte flags;
 
     /**
      * Empty constructor (required by {@link Externalizable}).

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 418d712..38d0108 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -726,15 +726,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         return sum;
     }
 
-    /**
-     * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, Map, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean, boolean)}
-     * method instead to retrieve DHT value.
-     *  @param keys {@inheritDoc}
-     * @param forcePrimary {@inheritDoc}
-     * @param skipTx {@inheritDoc}
-     * @param needVer Need version.  @return {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
         @Nullable Collection<? extends K> keys,
         boolean forcePrimary,
@@ -805,6 +797,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param reader Reader node ID.
      * @param msgId Message ID.
      * @param keys Keys to get.
+     * @param addReaders Add readers flag.
      * @param readThrough Read through flag.
      * @param topVer Topology version.
      * @param subjId Subject ID.
@@ -816,6 +809,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader,
         long msgId,
         Map<KeyCacheObject, Boolean> keys,
+        boolean addReaders,
         boolean readThrough,
         AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -834,7 +828,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             taskNameHash,
             expiry,
             skipVals,
-            recovery);
+            recovery,
+            addReaders);
 
         fut.init();
 
@@ -854,7 +849,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param skipVals Skip vals flag.
      * @return Future for the operation.
      */
-    private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+    public GridDhtGetSingleFuture getDhtSingleAsync(
         UUID nodeId,
         long msgId,
         KeyCacheObject key,
@@ -867,7 +862,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         boolean skipVals,
         boolean recovery
     ) {
-        GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+        GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>(
             ctx,
             msgId,
             nodeId,
@@ -992,7 +987,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      */
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
-        assert !req.reload() : req;
 
         final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
 
@@ -1000,6 +994,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             getDhtAsync(nodeId,
                 req.messageId(),
                 req.keys(),
+                req.addReaders(),
                 req.readThrough(),
                 req.topologyVersion(),
                 req.subjectId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 8031c8f..29bf3a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -112,6 +112,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     /** */
     private final boolean recovery;
 
+    /** */
+    private final boolean addReaders;
+
     /**
      * @param cctx Context.
      * @param msgId Message ID.
@@ -135,7 +138,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        boolean addReaders
     ) {
         super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size()));
 
@@ -153,6 +157,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
         this.recovery = recovery;
+        this.addReaders = addReaders;
 
         futId = IgniteUuid.randomUuid();
 
@@ -346,12 +351,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
         GridCompoundFuture<Boolean, Boolean> txFut = null;
 
-        ClusterNode readerNode = cctx.discovery().node(reader);
-
         ReaderArguments readerArgs = null;
 
-        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+        if (addReaders && !skipVals && !cctx.localNodeId().equals(reader)) {
             for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
+                if (!k.getValue())
+                    continue;
+
                 while (true) {
                     GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
 
@@ -359,7 +365,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         if (e.obsolete())
                             continue;
 
-                        boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+                        boolean addReader = !e.deleted();
 
                         if (addReader) {
                             e.unswap(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index b9007ba..095fe77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -73,7 +73,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
     private KeyCacheObject key;
 
     /** */
-    private boolean addRdr;
+    private final boolean addRdr;
 
     /** Reserved partitions. */
     private int part = -1;
@@ -123,7 +123,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         long msgId,
         UUID reader,
         KeyCacheObject key,
-        Boolean addRdr,
+        boolean addRdr,
         boolean readThrough,
         @NotNull AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -303,11 +303,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
 
         IgniteInternalFuture<Boolean> rdrFut = null;
 
-        ClusterNode readerNode = cctx.discovery().node(reader);
-
         ReaderArguments readerArgs = null;
 
-        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+        if (addRdr && !skipVals && !cctx.localNodeId().equals(reader)) {
             while (true) {
                 GridDhtCacheEntry e = cache().entryExx(key, topVer);
 
@@ -315,7 +313,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                     if (e.obsolete())
                         continue;
 
-                    boolean addReader = (!e.deleted() && this.addRdr && !skipVals);
+                    boolean addReader = !e.deleted();
 
                     if (addReader) {
                         e.unswap(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index d607ff1..acba833 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -1026,7 +1026,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
                     req.createTtl(),
                     req.accessTtl(),
                     req.skipStore(),
-                    req.keepBinary());
+                    req.keepBinary(),
+                    req.nearCache());
 
                 final GridDhtTxLocal t = tx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 26f08fa..e1bc313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -265,7 +265,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached,
         IgniteTxEntry entry, AffinityTopologyVersion topVer) {
         // Don't add local node as reader.
-        if (!cctx.localNodeId().equals(nearNodeId)) {
+        if (entry.addReader() && !cctx.localNodeId().equals(nearNodeId)) {
             GridCacheContext cacheCtx = cached.context();
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8b51cb5..c027ab9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -535,6 +535,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
      * @param accessTtl TTL for read operation.
      * @param needRetVal Return value flag.
      * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
+     * @param nearCache {@code True} if near cache enabled on originating node.
      * @return Lock future.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
@@ -547,7 +549,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         long createTtl,
         long accessTtl,
         boolean skipStore,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean nearCache
     ) {
         try {
             checkValid();
@@ -612,7 +615,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         -1L,
                         null,
                         skipStore,
-                        keepBinary);
+                        keepBinary,
+                        nearCache);
 
                     if (read)
                         txEntry.ttl(accessTtl);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 609e4cf..5b4b10f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -855,6 +855,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             ret,
             prepErr,
             null,
+            tx.onePhaseCommit(),
             tx.activeCachesDeploymentEnabled());
 
         if (prepErr == null) {
@@ -1231,6 +1232,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 return;
 
             if (last) {
+                if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+                    for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                        if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                            tx.onePhaseCommit(false);
+
+                            break;
+                        }
+                    }
+                }
+
                 int miniId = 0;
 
                 assert tx.transactionNodes() != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 5543cec..8db0ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -292,6 +292,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     cache().getDhtAsync(n.id(),
                         -1,
                         mappedKeys,
+                        false,
                         readThrough,
                         topVer,
                         subjId,
@@ -351,6 +352,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     taskName == null ? 0 : taskName.hashCode(),
                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                     expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    false,
                     skipVals,
                     cctx.deploymentEnabled(),
                     recovery);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index d3bfb3a..9eb01ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -224,11 +224,10 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             return;
 
         if (node.isLocal()) {
-            Map<KeyCacheObject, Boolean> map = Collections.singletonMap(key, false);
-
-            final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(),
+            final GridDhtFuture<GridCacheEntryInfo> fut = cctx.dht().getDhtSingleAsync(node.id(),
                 -1,
-                map,
+                key,
+                false,
                 readThrough,
                 topVer,
                 subjId,
@@ -250,14 +249,12 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
                 map(updTopVer);
             }
             else {
-                fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() {
-                    @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) {
+                fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
+                    @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> fut) {
                         try {
-                            Collection<GridCacheEntryInfo> infos = fut.get();
-
-                            assert F.isEmpty(infos) || infos.size() == 1 : infos;
+                            GridCacheEntryInfo info = fut.get();
 
-                            setResult(F.first(infos));
+                            setResult(info);
                         }
                         catch (Exception e) {
                             U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index b88eb47..de63ea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -963,29 +963,32 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
                                     first = false;
                                 }
 
-                                    assert !implicitTx() && !implicitSingleTx() : tx;req = new GridNearLockRequest(
-                                        cctx.cacheId(),
-                                        topVer,
-                                        cctx.nodeId(),
-                                        threadId,
-                                        futId,
-                                        lockVer,
-                                        inTx(),
-                                        read,
-                                        retval,
-                                        isolation(),
-                                        isInvalidate(),
-                                        timeout,
-                                        mappedKeys.size(),
-                                        inTx() ? tx.size() : mappedKeys.size(),
-                                        inTx() && tx.syncMode() == FULL_SYNC,
-                                        inTx() ? tx.subjectId() : null,
-                                        inTx() ? tx.taskNameHash() : 0,
-                                        read ? createTtl : -1L,
-                                        read ? accessTtl : -1L,
+                                assert !implicitTx() && !implicitSingleTx() : tx;
+
+                                req = new GridNearLockRequest(
+                                    cctx.cacheId(),
+                                    topVer,
+                                    cctx.nodeId(),
+                                    threadId,
+                                    futId,
+                                    lockVer,
+                                    inTx(),
+                                    read,
+                                    retval,
+                                    isolation(),
+                                    isInvalidate(),
+                                    timeout,
+                                    mappedKeys.size(),
+                                    inTx() ? tx.size() : mappedKeys.size(),
+                                    inTx() && tx.syncMode() == FULL_SYNC,
+                                    inTx() ? tx.subjectId() : null,
+                                    inTx() ? tx.taskNameHash() : 0,
+                                    read ? createTtl : -1L,
+                                    read ? accessTtl : -1L,
                                     skipStore,
                                     keepBinary,
                                     clientFirst,
+                                    false,
                                     cctx.deploymentEnabled());
 
                                 mapping.request(req);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c7c6e9e..d4769a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -314,6 +314,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     dht().getDhtAsync(n.id(),
                         -1,
                         mappedKeys,
+                        false,
                         readThrough,
                         topVer,
                         subjId,
@@ -379,6 +380,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     taskName == null ? 0 : taskName.hashCode(),
                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                     expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    true,
                     skipVals,
                     cctx.deploymentEnabled(),
                     recovery);
@@ -445,7 +447,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         EntryGetResult res = entry.innerGetVersioned(
                             null,
                             null,
-                            /**update-metrics*/true,
+                            /*update-metrics*/true,
                             /*event*/!skipVals,
                             subjId,
                             null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 6092511..5f042d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -55,6 +54,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int READ_THROUGH_FLAG_MASK = 0x01;
+
+    /** */
+    private static final int SKIP_VALS_FLAG_MASK = 0x02;
+
+    /** */
+    private static final int ADD_READER_FLAG_MASK = 0x04;
+
+    /** */
+    public static final int RECOVERY_FLAG_MASK = 0x08;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -75,19 +86,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
     /** */
     @GridDirectCollection(boolean.class)
-    private Collection<Boolean> flags;
-
-    /** Reload flag. */
-    private boolean reload;
+    private List<Boolean> readersFlags;
 
-    /** Read through flag. */
-    private boolean readThrough;
-
-    /** Skip values flag. Used for {@code containsKey} method. */
-    private boolean skipVals;
-
-    /** Recovery flag. */
-    private boolean recovery;
+    /** */
+    private byte flags;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -139,6 +141,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         int taskNameHash,
         long createTtl,
         long accessTtl,
+        boolean addReader,
         boolean skipVals,
         boolean addDepInfo,
         boolean recovery
@@ -153,22 +156,35 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         this.ver = ver;
 
         this.keys = new ArrayList<>(keys.size());
-        flags = new ArrayList<>(keys.size());
+
+        if (addReader)
+            readersFlags = new ArrayList<>(keys.size());
 
         for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) {
             this.keys.add(entry.getKey());
-            flags.add(entry.getValue());
+
+            if (addReader)
+                readersFlags.add(entry.getValue());
         }
 
-        this.readThrough = readThrough;
         this.topVer = topVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
-        this.skipVals = skipVals;
         this.addDepInfo = addDepInfo;
-        this.recovery = recovery;
+
+        if (readThrough)
+            flags |= READ_THROUGH_FLAG_MASK;
+
+        if (skipVals)
+            flags |= SKIP_VALS_FLAG_MASK;
+
+        if (addReader)
+            flags |= ADD_READER_FLAG_MASK;
+
+        if (recovery)
+            flags |= RECOVERY_FLAG_MASK;
     }
 
     /**
@@ -214,17 +230,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     }
 
     /**
-     * @return Reload flag.
-     */
-    public boolean reload() {
-        return reload;
-    }
-
-    /**
      * @return Read through flag.
      */
     public boolean readThrough() {
-        return readThrough;
+        return (flags & READ_THROUGH_FLAG_MASK) != 0;
     }
 
     /**
@@ -232,14 +241,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
      *      returned as future result.
      */
     public boolean skipValues() {
-        return skipVals;
+        return (flags & SKIP_VALS_FLAG_MASK) != 0;
     }
 
     /**
      * @return Recovery flag.
      */
     public boolean recovery() {
-        return recovery;
+        return (flags & RECOVERY_FLAG_MASK) != 0;
+    }
+
+    public boolean addReaders() {
+        return (flags & ADD_READER_FLAG_MASK) != 0;
     }
 
     /**
@@ -277,7 +290,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
         assert ctx != null;
         assert !F.isEmpty(keys);
-        assert keys.size() == flags.size();
+        assert readersFlags == null || keys.size() == readersFlags.size();
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
@@ -297,16 +310,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         finishUnmarshalCacheObjects(keys, cctx, ldr);
 
         assert !F.isEmpty(keys);
-        assert keys.size() == flags.size();
+        assert readersFlags == null || keys.size() == readersFlags.size();
 
         if (keyMap == null) {
             keyMap = U.newLinkedHashMap(keys.size());
 
             Iterator<KeyCacheObject> keysIt = keys.iterator();
-            Iterator<Boolean> flagsIt = flags.iterator();
 
-            while (keysIt.hasNext())
-                keyMap.put(keysIt.next(), flagsIt.next());
+            for (int i = 0; i < keys.size(); i++) {
+                Boolean addRdr = readersFlags != null ? readersFlags.get(i) : Boolean.FALSE;
+
+                keyMap.put(keysIt.next(), addRdr);
+            }
         }
     }
 
@@ -343,7 +358,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
@@ -367,48 +382,30 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("readThrough", readThrough))
+                if (!writer.writeCollection("readersFlags", readersFlags, MessageCollectionItemType.BOOLEAN))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeBoolean("recovery", recovery))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeBoolean("reload", reload))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeBoolean("skipVals", skipVals))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 11:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 12:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 13:
                 if (!writer.writeMessage("ver", ver))
                     return false;
 
@@ -447,7 +444,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 5:
-                flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN);
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -479,7 +476,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 9:
-                readThrough = reader.readBoolean("readThrough");
+                readersFlags = reader.readCollection("readersFlags", MessageCollectionItemType.BOOLEAN);
 
                 if (!reader.isLastRead())
                     return false;
@@ -487,30 +484,6 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 10:
-                recovery = reader.readBoolean("recovery");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                reload = reader.readBoolean("reload");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                skipVals = reader.readBoolean("skipVals");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -518,7 +491,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 14:
+            case 11:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -526,7 +499,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 15:
+            case 12:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -534,7 +507,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 16:
+            case 13:
                 ver = reader.readMessage("ver");
 
                 if (!reader.isLastRead())
@@ -554,7 +527,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 17;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 0786f29..1118d99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1074,6 +1074,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo
                                                 skipStore,
                                                 keepBinary,
                                                 clientFirst,
+                                                true,
                                                 cctx.deploymentEnabled());
 
                                             mapping.request(req);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 4815fcf..b48693d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -55,6 +55,9 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
     /** */
     private static final int SYNC_COMMIT_FLAG_MASK = 0x04;
 
+    /** */
+    private static final int NEAR_CACHE_FLAG_MASK = 0x08;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -137,8 +140,8 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         boolean skipStore,
         boolean keepBinary,
         boolean firstClientReq,
+        boolean nearCache,
         boolean addDepInfo
-
     ) {
         super(
             cacheId,
@@ -171,6 +174,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
         setFlag(syncCommit, SYNC_COMMIT_FLAG_MASK);
         setFlag(firstClientReq, FIRST_CLIENT_REQ_FLAG_MASK);
         setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK);
+        setFlag(nearCache, NEAR_CACHE_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if near cache enabled on originating node.
+     */
+    public boolean nearCache() {
+        return isFlag(NEAR_CACHE_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 89874ab..27cebf8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -608,10 +608,19 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
      * @return {@code True} if need to send finish request for one phase commit transaction.
      */
     private boolean needFinishOnePhase(boolean commit) {
+        assert tx.onePhaseCommit();
+
         if (tx.mappings().empty())
             return false;
 
-        return tx.txState().hasNearCache(cctx) || !commit;
+        if (!commit)
+            return true;
+
+        GridDistributedTxMapping mapping = tx.mappings().singleMapping();
+
+        assert mapping != null;
+
+        return mapping.hasNearCacheEntries();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8c10e53..55905b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1217,7 +1217,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     drExpireTime,
                                     drVer,
                                     skipStore,
-                                    keepBinary);
+                                    keepBinary,
+                                    CU.isNearEnabled(cacheCtx));
                             }
                             else {
                                 txEntry = addEntry(READ,
@@ -1232,7 +1233,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     -1L,
                                     null,
                                     skipStore,
-                                    keepBinary);
+                                    keepBinary,
+                                    CU.isNearEnabled(cacheCtx));
                             }
 
                             txEntry.markValid();
@@ -1262,7 +1264,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                         drExpireTime,
                         drVer,
                         skipStore,
-                        keepBinary);
+                        keepBinary,
+                        CU.isNearEnabled(cacheCtx));
 
                     if (enlisted != null)
                         enlisted.add(cacheKey);
@@ -1365,7 +1368,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                     drExpireTime,
                     drVer,
                     skipStore,
-                    keepBinary);
+                    keepBinary,
+                    CU.isNearEnabled(cacheCtx));
 
                 if (enlisted != null)
                     enlisted.add(cacheKey);
@@ -2201,7 +2205,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                 -1L,
                                 null,
                                 skipStore,
-                                !deserializeBinary);
+                                !deserializeBinary,
+                                CU.isNearEnabled(cacheCtx));
 
                             // As optimization, mark as checked immediately
                             // for non-pessimistic if value is not null.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 004e4da..a94d6fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
             Collection<UUID> backups = entry.getValue();
 
-            if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion()))
+            if (backups.size() <= 1)
                 tx.onePhaseCommit(true);
         }
     }
@@ -191,6 +191,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
         assert res.error() == null : res;
 
+        if (tx.onePhaseCommit() && !res.onePhaseCommit())
+            tx.onePhaseCommit(false);
+
         UUID nodeId = m.primary().id();
 
         for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index ceab1f1..8162168 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -52,6 +52,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Tx onePhaseCommit flag on primary node. */
+    private static final int NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK = 0x01;
+
     /** Collection of versions that are pending and less than lock version. */
     @GridToStringInclude
     @GridDirectCollection(GridCacheVersion.class)
@@ -123,6 +126,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         GridCacheReturn retVal,
         Throwable err,
         AffinityTopologyVersion clientRemapVer,
+        boolean onePhaseCommit,
         boolean addDepInfo
     ) {
         super(part, xid, err, addDepInfo);
@@ -136,6 +140,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.writeVer = writeVer;
         this.retVal = retVal;
         this.clientRemapVer = clientRemapVer;
+
+        if (onePhaseCommit)
+            flags |= NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK;
+    }
+
+    /**
+     * @return One-phase commit state on primary node.
+     */
+    public boolean onePhaseCommit() {
+        return isFlag(NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 30aa335..527688e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -88,6 +88,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
     private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
 
+    /** Flag indicating that near cache is enabled on originating node and it should be added as reader. */
+    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08;
+
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
         AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -275,6 +278,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      * @param filters Put filters.
      * @param conflictVer Data center replication version.
      * @param skipStore Skip store flag.
+     * @param addReader Add reader flag.
      */
     public IgniteTxEntry(GridCacheContext<?, ?> ctx,
         IgniteInternalTx tx,
@@ -287,7 +291,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         CacheEntryPredicate[] filters,
         GridCacheVersion conflictVer,
         boolean skipStore,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean addReader
     ) {
         assert ctx != null;
         assert tx != null;
@@ -304,6 +309,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         skipStore(skipStore);
         keepBinary(keepBinary);
+        addReader(addReader);
 
         if (entryProcessor != null)
             addEntryProcessor(entryProcessor, invokeArgs);
@@ -524,6 +530,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     }
 
     /**
+     * @param addReader Add reader flag.
+     */
+    public void addReader(boolean addReader) {
+        setFlag(addReader, TX_ENTRY_ADD_READER_FLAG_MASK);
+    }
+
+    /**
+     * @return Add reader flag.
+     */
+    public boolean addReader() {
+        return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK);
+    }
+
+    /**
      * Sets flag mask.
      *
      * @param flag Set or clear.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba3b2b6..35ee011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -249,6 +249,7 @@ public class IgniteTxHandler {
                         null,
                         e,
                         null,
+                        req.onePhaseCommit(),
                         req.deployInfo() != null);
                 }
             }
@@ -365,6 +366,7 @@ public class IgniteTxHandler {
                         null,
                         null,
                         top.topologyVersion(),
+                        req.onePhaseCommit(),
                         req.deployInfo() != null);
 
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 36f5f2f..99402c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -133,11 +132,6 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
-        return cacheCtx != null && cacheCtx.isNear();
-    }
-
-    /** {@inheritDoc} */
     @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
         if (cacheCtx == null || cacheCtx.isLocal())
             return cctx.exchange().lastTopologyFuture();
@@ -294,11 +288,6 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
-        return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false;
-    }
-
-    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 52a0f56..e4b850d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1279,7 +1279,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         long drExpireTime,
         @Nullable GridCacheVersion drVer,
         boolean skipStore,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean addReader
     ) {
         assert invokeArgs == null || op == TRANSFORM;
 
@@ -1355,7 +1356,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 filter,
                 drVer,
                 skipStore,
-                keepBinary);
+                keepBinary,
+                addReader);
 
             txEntry.conflictExpireTime(drExpireTime);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index fe9fcbd..123d396 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-
 /**
  *
  */
@@ -44,11 +41,4 @@ public interface IgniteTxLocalState extends IgniteTxState {
      *
      */
     public void seal();
-
-    /**
-     * @param ctx Context.
-     * @param topVer Topology version.
-     * @return {@code True} if tx has cache with created near cache.
-     */
-    public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 2532a92..86ae684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -67,13 +67,6 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
-        assert false;
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public void addActiveCache(GridCacheContext cacheCtx, boolean recovery, IgniteTxLocalAdapter tx)
         throws IgniteCheckedException {
         assert false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index ed2526e..ee48ed7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -79,12 +79,6 @@ public interface IgniteTxState {
     public CacheWriteSynchronizationMode syncMode(GridCacheSharedContext cctx);
 
     /**
-     * @param cctx Context.
-     * @return {@code True} is tx has active near cache.
-     */
-    public boolean hasNearCache(GridCacheSharedContext cctx);
-
-    /**
      * @param cacheCtx Context.
      * @param tx Transaction.
      * @throws IgniteCheckedException If cache check failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 3679208..0cf66f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -28,8 +28,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -202,20 +200,6 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
-        for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = activeCacheIds.get(i);
-
-            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
-            if (cacheCtx.isNear())
-                return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
     @Override public void addActiveCache(GridCacheContext cacheCtx, boolean recovery, IgniteTxLocalAdapter tx)
         throws IgniteCheckedException {
         GridCacheSharedContext cctx = cacheCtx.shared();
@@ -476,22 +460,6 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
-        DiscoCache discoCache = ctx.discovery().discoCache(topVer);
-
-        assert discoCache != null : topVer;
-
-        for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = activeCacheIds.get(i);
-
-            if (discoCache.hasNearCache(cacheId))
-                return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
new file mode 100644
index 0000000..ed347c1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/**
+ *
+ */
+public class IgniteOnePhaseCommitNearReadersTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean testSpi;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        if (testSpi) {
+            TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+            cfg.setCommunicationSpi(commSpi);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReadersUpdate1() throws Exception {
+        putReadersUpdate(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReadersUpdate2() throws Exception {
+        putReadersUpdate(0);
+    }
+
+    /**
+     * @param backups Backups number.
+     * @throws Exception If failed.
+     */
+    private void putReadersUpdate(int backups) throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite srv = ignite(0);
+
+        srv.createCache(cacheConfiguration(backups));
+
+        Ignite client1 = startGrid(SRVS);
+
+        IgniteCache<Object, Object> cache1 = client1.createNearCache(DEFAULT_CACHE_NAME,
+            new NearCacheConfiguration<>());
+
+        Integer key = primaryKey(srv.cache(DEFAULT_CACHE_NAME));
+
+        Ignite client2 = startGrid(SRVS + 1);
+
+        IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+        cache1.put(key, 1);
+
+        cache2.put(key, 2);
+
+        checkCacheData(F.asMap(key, 2), DEFAULT_CACHE_NAME);
+
+        int val = 10;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                try (Transaction tx = client2.transactions().txStart(concurrency, isolation)) {
+                    cache2.put(key, val);
+
+                    tx.commit();
+                }
+
+                checkCacheData(F.asMap(key, val), DEFAULT_CACHE_NAME);
+
+                val++;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReaderUpdatePrimaryFails1() throws Exception {
+        putReaderUpdatePrimaryFails(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReaderUpdatePrimaryFails2() throws Exception {
+        putReaderUpdatePrimaryFails(0);
+    }
+
+    /**
+     * @param backups Backups number.
+     * @throws Exception If failed.
+     */
+    private void putReaderUpdatePrimaryFails(int backups) throws Exception {
+        testSpi = true;
+
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite srv = ignite(0);
+
+        srv.createCache(cacheConfiguration(backups));
+
+        Ignite client1 = startGrid(SRVS);
+
+        IgniteCache<Object, Object> cache1 = client1.createNearCache(DEFAULT_CACHE_NAME,
+            new NearCacheConfiguration<>());
+
+        Ignite client2 = startGrid(SRVS + 1);
+
+        client= false;
+
+        IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+        Integer key = primaryKey(srv.cache(DEFAULT_CACHE_NAME));
+
+        cache1.put(key, 1);
+
+        spi(srv).blockMessages(GridNearTxPrepareResponse.class, client2.name());
+
+        IgniteFuture<?> fut = cache2.putAsync(key, 2);
+
+        U.sleep(1000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCacheData(F.asMap(key, backups == 0 ? null : 2), DEFAULT_CACHE_NAME);
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                srv = startGrid(0);
+
+                awaitPartitionMapExchange();
+
+                key = primaryKey(srv.cache(DEFAULT_CACHE_NAME));
+
+                cache1.put(key, 1);
+
+                spi(srv).blockMessages(GridNearTxPrepareResponse.class, client2.name());
+
+                try (Transaction tx = client2.transactions().txStart(concurrency, isolation)) {
+                    cache2.putAsync(key, 2);
+
+                    fut = tx.commitAsync();
+
+                    U.sleep(1000);
+
+                    assertFalse(fut.isDone());
+
+                    stopGrid(0);
+
+                    if (backups == 0)
+                        fut.get();
+                    else {
+                        try {
+                            fut.get();
+
+                            fail();
+                        }
+                        catch (TransactionRollbackException ignore) {
+                            // Expected.
+                        }
+                    }
+                }
+
+                checkCacheData(F.asMap(key, backups == 0 ? null : 1), DEFAULT_CACHE_NAME);
+
+                try (Transaction tx = client2.transactions().txStart(concurrency, isolation)) {
+                    cache2.putAsync(key, 2);
+
+                    tx.commit();
+                }
+
+                checkCacheData(F.asMap(key, 2), DEFAULT_CACHE_NAME);
+            }
+        }
+    }
+
+    /**
+     * @param backups Backups number.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 943c5f5..43f6b13 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
+import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReadersTest;
 import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
 import org.apache.ignite.internal.processors.cache.database.MemoryPolicyInitializationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
@@ -270,6 +271,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
 
+        suite.addTest(new TestSuite(IgniteOnePhaseCommitNearReadersTest.class));
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca6cd142/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index f0c0c5a..975f951 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -91,6 +91,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("IgniteCache Test Suite part 3");
 
+        suite.addTestSuite(IgniteCacheGroupsTest.class);
+
         // Value consistency tests.
         suite.addTestSuite(GridCacheValueConsistencyAtomicSelfTest.class);
         suite.addTestSuite(GridCacheValueConsistencyAtomicNearEnabledSelfTest.class);
@@ -196,8 +198,6 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(CacheAsyncOperationsTest.class);
 
-        suite.addTestSuite(IgniteCacheGroupsTest.class);
-
         return suite;
     }
 }


Mime
View raw message