ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5272 tx
Date Thu, 08 Jun 2017 15:27:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5272-tx [created] a74f5bf3c


ignite-5272 tx


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a74f5bf3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a74f5bf3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a74f5bf3

Branch: refs/heads/ignite-5272-tx
Commit: a74f5bf3c70340dfb398776a7ed871945529aa64
Parents: f03ecba
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Jun 8 17:10:32 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Jun 8 18:27:29 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |   3 +
 .../GridDistributedTxPrepareRequest.java        |  14 +-
 .../GridDistributedTxPrepareResponse.java       |   2 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  20 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  16 +-
 .../distributed/dht/GridDhtGetSingleFuture.java |  10 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   2 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   3 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  11 +
 .../dht/GridPartitionedGetFuture.java           |   2 +
 .../dht/GridPartitionedSingleGetFuture.java     |   7 +-
 .../distributed/near/GridNearGetFuture.java     |   4 +-
 .../distributed/near/GridNearGetRequest.java    | 144 ++++++-------
 .../cache/distributed/near/GridNearTxLocal.java |  15 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |   5 +-
 .../near/GridNearTxPrepareResponse.java         |  14 ++
 .../cache/transactions/IgniteTxEntry.java       |  16 +-
 .../cache/transactions/IgniteTxHandler.java     |   2 +
 .../IgniteTxImplicitSingleStateImpl.java        |   5 -
 .../transactions/IgniteTxLocalAdapter.java      |   6 +-
 .../cache/transactions/IgniteTxLocalState.java  |   7 -
 .../cache/transactions/IgniteTxStateImpl.java   |  16 --
 .../IgniteOnePhaseCommitNearReadersTest.java    | 203 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 24 files changed, 371 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a251047..81d5b39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -390,6 +390,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
             unmarshall(nodeId, cacheMsg);
 
+            log.info("Message: " + cacheMsg);
+
             if (cacheMsg.classError() != null)
                 processFailedMessage(nodeId, cacheMsg, c);
             else
@@ -706,6 +708,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     null,
                     null,
                     null,
+                    false,
                     req.deployInfo() != null);
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 4b11414f..3205c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -689,15 +689,19 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         StringBuilder flags = new StringBuilder();
 
         if (needReturnValue())
-            flags.append("retVal");
+            appendFlag(flags, "retVal");
+
         if (isInvalidate())
-            flags.append("invalidate");
+            appendFlag(flags, "invalidate");
+
         if (onePhaseCommit())
-            flags.append("onePhase");
+            appendFlag(flags, "onePhase");
+
         if (last())
-            flags.append("last");
+            appendFlag(flags, "last");
+
         if (system())
-            flags.append("sys");
+            appendFlag(flags, "sys");
 
         return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this,
             "flags", flags.toString(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index 7cd3ad8..58e9492 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -55,7 +55,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     private int part;
 
     /** */
-    private byte flags;
+    protected byte flags;
 
     /**
      * Empty constructor (required by {@link Externalizable}).

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 418d712..6427140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -726,15 +726,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         return sum;
     }
 
-    /**
-     * This method is used internally. Use
-     * {@link #getDhtAsync(UUID, long, Map, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean, boolean)}
-     * method instead to retrieve DHT value.
-     *  @param keys {@inheritDoc}
-     * @param forcePrimary {@inheritDoc}
-     * @param skipTx {@inheritDoc}
-     * @param needVer Need version.  @return {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(
         @Nullable Collection<? extends K> keys,
         boolean forcePrimary,
@@ -816,6 +808,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader,
         long msgId,
         Map<KeyCacheObject, Boolean> keys,
+        boolean addReaders,
         boolean readThrough,
         AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -834,7 +827,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             taskNameHash,
             expiry,
             skipVals,
-            recovery);
+            recovery,
+            addReaders);
 
         fut.init();
 
@@ -854,7 +848,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      * @param skipVals Skip vals flag.
      * @return Future for the operation.
      */
-    private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+    public GridDhtGetSingleFuture getDhtSingleAsync(
         UUID nodeId,
         long msgId,
         KeyCacheObject key,
@@ -867,7 +861,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         boolean skipVals,
         boolean recovery
     ) {
-        GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+        GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>(
             ctx,
             msgId,
             nodeId,
@@ -992,7 +986,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
      */
     protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) {
         assert ctx.affinityNode();
-        assert !req.reload() : req;
 
         final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl());
 
@@ -1000,6 +993,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             getDhtAsync(nodeId,
                 req.messageId(),
                 req.keys(),
+                req.addReaders(),
                 req.readThrough(),
                 req.topologyVersion(),
                 req.subjectId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 8031c8f..29bf3a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -112,6 +112,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
     /** */
     private final boolean recovery;
 
+    /** */
+    private final boolean addReaders;
+
     /**
      * @param cctx Context.
      * @param msgId Message ID.
@@ -135,7 +138,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         int taskNameHash,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean recovery
+        boolean recovery,
+        boolean addReaders
     ) {
         super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size()));
 
@@ -153,6 +157,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
         this.recovery = recovery;
+        this.addReaders = addReaders;
 
         futId = IgniteUuid.randomUuid();
 
@@ -346,12 +351,13 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
         GridCompoundFuture<Boolean, Boolean> txFut = null;
 
-        ClusterNode readerNode = cctx.discovery().node(reader);
-
         ReaderArguments readerArgs = null;
 
-        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+        if (addReaders && !skipVals && !cctx.localNodeId().equals(reader)) {
             for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
+                if (!k.getValue())
+                    continue;
+
                 while (true) {
                     GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
 
@@ -359,7 +365,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         if (e.obsolete())
                             continue;
 
-                        boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+                        boolean addReader = !e.deleted();
 
                         if (addReader) {
                             e.unswap(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index b9007ba..095fe77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -73,7 +73,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
     private KeyCacheObject key;
 
     /** */
-    private boolean addRdr;
+    private final boolean addRdr;
 
     /** Reserved partitions. */
     private int part = -1;
@@ -123,7 +123,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
         long msgId,
         UUID reader,
         KeyCacheObject key,
-        Boolean addRdr,
+        boolean addRdr,
         boolean readThrough,
         @NotNull AffinityTopologyVersion topVer,
         @Nullable UUID subjId,
@@ -303,11 +303,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
 
         IgniteInternalFuture<Boolean> rdrFut = null;
 
-        ClusterNode readerNode = cctx.discovery().node(reader);
-
         ReaderArguments readerArgs = null;
 
-        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) {
+        if (addRdr && !skipVals && !cctx.localNodeId().equals(reader)) {
             while (true) {
                 GridDhtCacheEntry e = cache().entryExx(key, topVer);
 
@@ -315,7 +313,7 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
                     if (e.obsolete())
                         continue;
 
-                    boolean addReader = (!e.deleted() && this.addRdr && !skipVals);
+                    boolean addReader = !e.deleted();
 
                     if (addReader) {
                         e.unswap(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 26f08fa..e1bc313 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -265,7 +265,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
     @Override @Nullable protected IgniteInternalFuture<Boolean> addReader(long msgId, GridDhtCacheEntry cached,
         IgniteTxEntry entry, AffinityTopologyVersion topVer) {
         // Don't add local node as reader.
-        if (!cctx.localNodeId().equals(nearNodeId)) {
+        if (entry.addReader() && !cctx.localNodeId().equals(nearNodeId)) {
             GridCacheContext cacheCtx = cached.context();
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 8b51cb5..8047bda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -612,7 +612,8 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
                         -1L,
                         null,
                         skipStore,
-                        keepBinary);
+                        keepBinary,
+                        CU.isNearEnabled(cacheCtx));
 
                     if (read)
                         txEntry.ttl(accessTtl);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 75f8366..ee1bb7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -853,6 +853,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             ret,
             prepErr,
             null,
+            tx.onePhaseCommit(),
             tx.activeCachesDeploymentEnabled());
 
         if (prepErr == null) {
@@ -1229,6 +1230,16 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 return;
 
             if (last) {
+                if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) {
+                    for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) {
+                        if (!tx.dhtMap().containsKey(nearMapping.primary().id())) {
+                            tx.onePhaseCommit(false);
+
+                            break;
+                        }
+                    }
+                }
+
                 int miniId = 0;
 
                 assert tx.transactionNodes() != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 5543cec..8db0ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -292,6 +292,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     cache().getDhtAsync(n.id(),
                         -1,
                         mappedKeys,
+                        false,
                         readThrough,
                         topVer,
                         subjId,
@@ -351,6 +352,7 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                     taskName == null ? 0 : taskName.hashCode(),
                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                     expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    false,
                     skipVals,
                     cctx.deploymentEnabled(),
                     recovery);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 3f612f7..a6883e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -221,11 +221,10 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
             return;
 
         if (node.isLocal()) {
-            Map<KeyCacheObject, Boolean> map = Collections.singletonMap(key, false);
-
-            final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(),
+            final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtSingleAsync(node.id(),
                 -1,
-                map,
+                key,
+                false,
                 readThrough,
                 topVer,
                 subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index c7c6e9e..d4769a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -314,6 +314,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     dht().getDhtAsync(n.id(),
                         -1,
                         mappedKeys,
+                        false,
                         readThrough,
                         topVer,
                         subjId,
@@ -379,6 +380,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                     taskName == null ? 0 : taskName.hashCode(),
                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                     expiryPlc != null ? expiryPlc.forAccess() : -1L,
+                    true,
                     skipVals,
                     cctx.deploymentEnabled(),
                     recovery);
@@ -445,7 +447,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         EntryGetResult res = entry.innerGetVersioned(
                             null,
                             null,
-                            /**update-metrics*/true,
+                            /*update-metrics*/true,
                             /*event*/!skipVals,
                             subjId,
                             null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 6092511..2ed46f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -55,6 +55,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int READ_THROUGH_FLAG_MASK = 0x01;
+
+    /** */
+    private static final int SKIP_VALS_FLAG_MASK = 0x02;
+
+    /** */
+    private static final int ADD_READER_FLAG_MASK = 0x04;
+
+    /** */
+    public static final int RECOVERY_FLAG_MASK = 0x08;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -75,19 +87,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
     /** */
     @GridDirectCollection(boolean.class)
-    private Collection<Boolean> flags;
-
-    /** Reload flag. */
-    private boolean reload;
+    private List<Boolean> readersFlags;
 
-    /** Read through flag. */
-    private boolean readThrough;
-
-    /** Skip values flag. Used for {@code containsKey} method. */
-    private boolean skipVals;
-
-    /** Recovery flag. */
-    private boolean recovery;
+    /** */
+    private byte flags;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
@@ -139,6 +142,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         int taskNameHash,
         long createTtl,
         long accessTtl,
+        boolean addReader,
         boolean skipVals,
         boolean addDepInfo,
         boolean recovery
@@ -153,22 +157,35 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         this.ver = ver;
 
         this.keys = new ArrayList<>(keys.size());
-        flags = new ArrayList<>(keys.size());
+
+        if (addReader)
+            readersFlags = new ArrayList<>(keys.size());
 
         for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) {
             this.keys.add(entry.getKey());
-            flags.add(entry.getValue());
+
+            if (addReader)
+                readersFlags.add(entry.getValue());
         }
 
-        this.readThrough = readThrough;
         this.topVer = topVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.createTtl = createTtl;
         this.accessTtl = accessTtl;
-        this.skipVals = skipVals;
         this.addDepInfo = addDepInfo;
-        this.recovery = recovery;
+
+        if (readThrough)
+            flags |= READ_THROUGH_FLAG_MASK;
+
+        if (skipVals)
+            flags |= SKIP_VALS_FLAG_MASK;
+
+        if (addReader)
+            flags |= ADD_READER_FLAG_MASK;
+
+        if (recovery)
+            flags |= RECOVERY_FLAG_MASK;
     }
 
     /**
@@ -214,17 +231,10 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
     }
 
     /**
-     * @return Reload flag.
-     */
-    public boolean reload() {
-        return reload;
-    }
-
-    /**
      * @return Read through flag.
      */
     public boolean readThrough() {
-        return readThrough;
+        return (flags & READ_THROUGH_FLAG_MASK) != 0;
     }
 
     /**
@@ -232,14 +242,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
      *      returned as future result.
      */
     public boolean skipValues() {
-        return skipVals;
+        return (flags & SKIP_VALS_FLAG_MASK) != 0;
     }
 
     /**
      * @return Recovery flag.
      */
     public boolean recovery() {
-        return recovery;
+        return (flags & RECOVERY_FLAG_MASK) != 0;
+    }
+
+    public boolean addReaders() {
+        return (flags & ADD_READER_FLAG_MASK) != 0;
     }
 
     /**
@@ -277,7 +291,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
         assert ctx != null;
         assert !F.isEmpty(keys);
-        assert keys.size() == flags.size();
+        assert readersFlags == null || keys.size() == readersFlags.size();
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
@@ -297,16 +311,18 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
         finishUnmarshalCacheObjects(keys, cctx, ldr);
 
         assert !F.isEmpty(keys);
-        assert keys.size() == flags.size();
+        assert readersFlags == null || keys.size() == readersFlags.size();
 
         if (keyMap == null) {
             keyMap = U.newLinkedHashMap(keys.size());
 
             Iterator<KeyCacheObject> keysIt = keys.iterator();
-            Iterator<Boolean> flagsIt = flags.iterator();
 
-            while (keysIt.hasNext())
-                keyMap.put(keysIt.next(), flagsIt.next());
+            for (int i = 0; i < keys.size(); i++) {
+                Boolean addRdr = readersFlags != null ? readersFlags.get(i) : Boolean.FALSE;
+
+                keyMap.put(keysIt.next(), addRdr);
+            }
         }
     }
 
@@ -343,7 +359,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
@@ -367,48 +383,30 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("readThrough", readThrough))
+                if (!writer.writeCollection("readersFlags", readersFlags, MessageCollectionItemType.BOOLEAN))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeBoolean("recovery", recovery))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeBoolean("reload", reload))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeBoolean("skipVals", skipVals))
-                    return false;
-
-                writer.incrementState();
-
-            case 13:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 11:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 15:
+            case 12:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 16:
+            case 13:
                 if (!writer.writeMessage("ver", ver))
                     return false;
 
@@ -447,7 +445,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 5:
-                flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN);
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -479,7 +477,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 9:
-                readThrough = reader.readBoolean("readThrough");
+                readersFlags = reader.readCollection("readersFlags", MessageCollectionItemType.BOOLEAN);
 
                 if (!reader.isLastRead())
                     return false;
@@ -487,30 +485,6 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
                 reader.incrementState();
 
             case 10:
-                recovery = reader.readBoolean("recovery");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                reload = reader.readBoolean("reload");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                skipVals = reader.readBoolean("skipVals");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 13:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -518,7 +492,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 14:
+            case 11:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -526,7 +500,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 15:
+            case 12:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -534,7 +508,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
                 reader.incrementState();
 
-            case 16:
+            case 13:
                 ver = reader.readMessage("ver");
 
                 if (!reader.isLastRead())
@@ -554,7 +528,7 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 17;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8c10e53..55905b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1217,7 +1217,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     drExpireTime,
                                     drVer,
                                     skipStore,
-                                    keepBinary);
+                                    keepBinary,
+                                    CU.isNearEnabled(cacheCtx));
                             }
                             else {
                                 txEntry = addEntry(READ,
@@ -1232,7 +1233,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                     -1L,
                                     null,
                                     skipStore,
-                                    keepBinary);
+                                    keepBinary,
+                                    CU.isNearEnabled(cacheCtx));
                             }
 
                             txEntry.markValid();
@@ -1262,7 +1264,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                         drExpireTime,
                         drVer,
                         skipStore,
-                        keepBinary);
+                        keepBinary,
+                        CU.isNearEnabled(cacheCtx));
 
                     if (enlisted != null)
                         enlisted.add(cacheKey);
@@ -1365,7 +1368,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                     drExpireTime,
                     drVer,
                     skipStore,
-                    keepBinary);
+                    keepBinary,
+                    CU.isNearEnabled(cacheCtx));
 
                 if (enlisted != null)
                     enlisted.add(cacheKey);
@@ -2201,7 +2205,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
                                 -1L,
                                 null,
                                 skipStore,
-                                !deserializeBinary);
+                                !deserializeBinary,
+                                CU.isNearEnabled(cacheCtx));
 
                             // As optimization, mark as checked immediately
                             // for non-pessimistic if value is not null.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 004e4da..a94d6fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
             Collection<UUID> backups = entry.getValue();
 
-            if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion()))
+            if (backups.size() <= 1)
                 tx.onePhaseCommit(true);
         }
     }
@@ -191,6 +191,9 @@ public abstract class GridNearTxPrepareFutureAdapter extends
 
         assert res.error() == null : res;
 
+        if (tx.onePhaseCommit() && !res.onePhaseCommit())
+            tx.onePhaseCommit(false);
+
         UUID nodeId = m.primary().id();
 
         for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index ceab1f1..9036e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -52,6 +52,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Skip store flag bit mask. */
+    private static final int NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK = 0x01;
+
     /** Collection of versions that are pending and less than lock version. */
     @GridToStringInclude
     @GridDirectCollection(GridCacheVersion.class)
@@ -123,6 +126,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         GridCacheReturn retVal,
         Throwable err,
         AffinityTopologyVersion clientRemapVer,
+        boolean onePhaseCommit,
         boolean addDepInfo
     ) {
         super(part, xid, err, addDepInfo);
@@ -136,6 +140,16 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         this.writeVer = writeVer;
         this.retVal = retVal;
         this.clientRemapVer = clientRemapVer;
+
+        if (onePhaseCommit)
+            flags |= NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK;
+    }
+
+    /**
+     * @return One-phase commit state on primary node.
+     */
+    public boolean onePhaseCommit() {
+        return isFlag(NEAR_PREPARE_ONE_PHASE_COMMIT_FLAG_MASK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 30aa335..0ee1b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -88,6 +88,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     /** Flag indicating that old value for 'invoke' operation was non null on primary node. */
     private static final int TX_ENTRY_OLD_VAL_ON_PRIMARY = 0x04;
 
+    /** Flag indicating that near cache is enabled on originating node and it should be added as reader. */
+    private static final int TX_ENTRY_ADD_READER_FLAG_MASK = 0x08;
+
     /** Prepared flag updater. */
     private static final AtomicIntegerFieldUpdater<IgniteTxEntry> PREPARED_UPD =
         AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared");
@@ -275,6 +278,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
      * @param filters Put filters.
      * @param conflictVer Data center replication version.
      * @param skipStore Skip store flag.
+     * @param addReader Add reader flag.
      */
     public IgniteTxEntry(GridCacheContext<?, ?> ctx,
         IgniteInternalTx tx,
@@ -287,7 +291,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         CacheEntryPredicate[] filters,
         GridCacheVersion conflictVer,
         boolean skipStore,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean addReader
     ) {
         assert ctx != null;
         assert tx != null;
@@ -304,6 +309,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         skipStore(skipStore);
         keepBinary(keepBinary);
+        addReader(addReader);
 
         if (entryProcessor != null)
             addEntryProcessor(entryProcessor, invokeArgs);
@@ -523,6 +529,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return isFlag(TX_ENTRY_KEEP_BINARY_FLAG_MASK);
     }
 
+    public void addReader(boolean addReader) {
+        setFlag(addReader, TX_ENTRY_ADD_READER_FLAG_MASK);
+    }
+
+    public boolean addReader() {
+        return isFlag(TX_ENTRY_ADD_READER_FLAG_MASK);
+    }
+
     /**
      * Sets flag mask.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba3b2b6..35ee011 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -249,6 +249,7 @@ public class IgniteTxHandler {
                         null,
                         e,
                         null,
+                        req.onePhaseCommit(),
                         req.deployInfo() != null);
                 }
             }
@@ -365,6 +366,7 @@ public class IgniteTxHandler {
                         null,
                         null,
                         top.topologyVersion(),
+                        req.onePhaseCommit(),
                         req.deployInfo() != null);
 
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 36f5f2f..5eb76a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -294,11 +294,6 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
-        return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false;
-    }
-
-    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 52a0f56..e4b850d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1279,7 +1279,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
         long drExpireTime,
         @Nullable GridCacheVersion drVer,
         boolean skipStore,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean addReader
     ) {
         assert invokeArgs == null || op == TRANSFORM;
 
@@ -1355,7 +1356,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                 filter,
                 drVer,
                 skipStore,
-                keepBinary);
+                keepBinary,
+                addReader);
 
             txEntry.conflictExpireTime(drExpireTime);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index fe9fcbd..1c3677c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -44,11 +44,4 @@ public interface IgniteTxLocalState extends IgniteTxState {
      *
      */
     public void seal();
-
-    /**
-     * @param ctx Context.
-     * @param topVer Topology version.
-     * @return {@code True} if tx has cache with created near cache.
-     */
-    public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 3679208..173e8f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -476,22 +476,6 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) {
-        DiscoCache discoCache = ctx.discovery().discoCache(topVer);
-
-        assert discoCache != null : topVer;
-
-        for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = activeCacheIds.get(i);
-
-            if (discoCache.hasNearCache(cacheId))
-                return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
new file mode 100644
index 0000000..7e06f6d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitNearReadersTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+
+/**
+ *
+ */
+public class IgniteOnePhaseCommitNearReadersTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private boolean testSpi;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        if (testSpi) {
+            TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+            cfg.setCommunicationSpi(commSpi);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReadersUpdate1() throws Exception {
+        putReadersUpdate(1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReadersUpdate2() throws Exception {
+        putReadersUpdate(0);
+    }
+
+    /**
+     * @param backups Backups number.
+     * @throws Exception If failed.
+     */
+    private void putReadersUpdate(int backups) throws Exception {
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite srv = ignite(0);
+
+        srv.createCache(cacheConfiguration(backups));
+
+        Ignite client1 = startGrid(SRVS);
+
+        IgniteCache<Object, Object> cache1 = client1.createNearCache(DEFAULT_CACHE_NAME,
+            new NearCacheConfiguration<>());
+
+        Integer key = primaryKey(srv.cache(DEFAULT_CACHE_NAME));
+
+        Ignite client2 = startGrid(SRVS + 1);
+
+        IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+        cache1.put(key, 1);
+
+        cache2.put(key, 2);
+
+        checkCacheData(F.asMap(key, 2), DEFAULT_CACHE_NAME);
+
+        int val = 10;
+
+        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
+            for (TransactionIsolation isolation : TransactionIsolation.values()) {
+                try (Transaction tx = client2.transactions().txStart(concurrency, isolation)) {
+                    cache2.put(key, val);
+
+                    tx.commit();
+                }
+
+                checkCacheData(F.asMap(key, val), DEFAULT_CACHE_NAME);
+
+                val++;
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReaderUpdatePrimaryFails1() throws Exception {
+        testSpi = true;
+
+        final int SRVS = 3;
+
+        startGrids(SRVS);
+
+        awaitPartitionMapExchange();
+
+        client = true;
+
+        Ignite srv = ignite(0);
+
+        srv.createCache(cacheConfiguration(1));
+
+        Ignite client1 = startGrid(SRVS);
+
+        IgniteCache<Object, Object> cache1 = client1.createNearCache(DEFAULT_CACHE_NAME,
+            new NearCacheConfiguration<>());
+
+        Integer key = primaryKey(srv.cache(DEFAULT_CACHE_NAME));
+
+        Ignite client2 = startGrid(SRVS + 1);
+
+        IgniteCache<Object, Object> cache2 = client2.cache(DEFAULT_CACHE_NAME);
+
+        cache1.put(key, 1);
+
+        spi(srv).blockMessages(GridNearTxPrepareResponse.class, client2.name());
+
+        IgniteFuture<?> fut = cache2.putAsync(key, 2);
+
+        U.sleep(2000);
+
+        assertFalse(fut.isDone());
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCacheData(F.asMap(key, 2), DEFAULT_CACHE_NAME);
+    }
+
+    /**
+     * @param backups Backups number.
+     * @return Configuration.
+     */
+    private CacheConfiguration cacheConfiguration(int backups) {
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a74f5bf3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 943c5f5..43f6b13 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
+import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitNearReadersTest;
 import org.apache.ignite.internal.processors.cache.MemoryPolicyConfigValidationTest;
 import org.apache.ignite.internal.processors.cache.database.MemoryPolicyInitializationTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheLoadingConcurrentGridStartSelfTest;
@@ -270,6 +271,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
 
+        suite.addTest(new TestSuite(IgniteOnePhaseCommitNearReadersTest.class));
+
         return suite;
     }
 }


Mime
View raw message