ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [29/33] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode
Date Wed, 28 Oct 2015 10:18:34 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index fe519a7..3c3527a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -221,34 +221,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         return true;
     }
 
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "RedundantCast"})
-    @Override public IgniteInternalFuture<Object> readThroughAllAsync(
-        Collection<KeyCacheObject> keys,
-        boolean reload,
-        boolean skipVals,
-        IgniteInternalTx tx,
-        @Nullable UUID subjId,
-        String taskName,
-        IgniteBiInClosure<KeyCacheObject, Object> vis
-    ) {
-        return (IgniteInternalFuture)loadAsync(tx,
-            keys,
-            reload,
-            /*force primary*/false,
-            subjId,
-            taskName,
-            /*deserialize portable*/true,
-            /*expiry policy*/null,
-            skipVals,
-            /*skip store*/false,
-            /*can remap*/true);
-    }
-
     /**
      * @param tx Transaction.
      * @param keys Keys to load.
-     * @param reload Reload flag.
      * @param forcePrimary Force primary flag.
      * @param subjId Subject ID.
      * @param taskName Task name.
@@ -256,11 +231,11 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
      * @param expiryPlc Expiry policy.
      * @param skipVal Skip value flag.
      * @param skipStore Skip store flag.
+     * @param canRemap Can remap flag.
      * @return Loaded values.
      */
     public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable IgniteInternalTx tx,
         @Nullable Collection<KeyCacheObject> keys,
-        boolean reload,
         boolean forcePrimary,
         @Nullable UUID subjId,
         String taskName,
@@ -280,7 +255,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             !skipStore,
-            reload,
             forcePrimary,
             txx,
             subjId,
@@ -288,7 +262,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
             deserializePortable,
             expiry,
             skipVal,
-            canRemap);
+            canRemap,
+            false,
+            false);
 
         // init() will register future for responses if future has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 2ae03d3..d558cc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"})
 public class GridNearCacheEntry extends GridDistributedCacheEntry {
     /** */
-    private static final int NEAR_SIZE_OVERHEAD = 36;
+    private static final int NEAR_SIZE_OVERHEAD = 36 + 16;
 
     /** Topology version at the moment when value was initialized from primary node. */
     private volatile long topVer = -1L;
@@ -58,6 +58,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     /** Partition. */
     private int part;
 
+    /** */
+    private short evictReservations;
+
     /**
      * @param ctx Cache context.
      * @param key Cache key.
@@ -316,15 +319,21 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         primaryNode(primaryNodeId, topVer);
     }
 
-    /**
-     * This method should be called only when committing optimistic transactions.
-     *
+    /*
      * @param dhtVer DHT version to record.
+     * @return {@code False} if given version is lower then existing version.
      */
-    public synchronized void recordDhtVersion(GridCacheVersion dhtVer) {
-        // Version manager must be updated separately, when adding DHT version
-        // to transaction entries.
-        this.dhtVer = dhtVer;
+    public final boolean recordDhtVersion(GridCacheVersion dhtVer) {
+        assert dhtVer != null;
+        assert Thread.holdsLock(this);
+
+        if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) <= 0) {
+            this.dhtVer = dhtVer;
+
+            return true;
+        }
+
+        return false;
     }
 
     /** {@inheritDoc} */
@@ -332,7 +341,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         UUID subjId, String taskName) throws IgniteCheckedException {
         return cctx.near().loadAsync(tx,
             F.asList(key),
-            reload,
             /*force primary*/false,
             subjId,
             taskName,
@@ -350,7 +358,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
      * @param val New value.
      * @param ver Version to use.
      * @param dhtVer DHT version received from remote node.
-     * @param expVer Optional version to match.
      * @param ttl Time to live.
      * @param expireTime Expiration time.
      * @param evt Event flag.
@@ -366,14 +373,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
         CacheObject val,
         GridCacheVersion ver,
         GridCacheVersion dhtVer,
-        @Nullable GridCacheVersion expVer,
         long ttl,
         long expireTime,
         boolean evt,
         AffinityTopologyVersion topVer,
         UUID subjId)
         throws IgniteCheckedException, GridCacheEntryRemovedException {
-        boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion());
+        assert dhtVer != null;
 
         GridCacheVersion enqueueVer = null;
 
@@ -389,28 +395,25 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
                 CacheObject old = this.val;
                 boolean hasVal = hasValueUnlocked();
 
-                if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) {
+                if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) {
                     primaryNode(primaryNodeId, topVer);
 
-                    // Change entry only if dht version has changed.
-                    if (!dhtVer.equals(dhtVersion())) {
-                        update(val, expireTime, ttl, ver);
+                    update(val, expireTime, ttl, ver);
 
-                        if (cctx.deferredDelete() && !isInternal()) {
-                            boolean deleted = val == null;
+                    if (cctx.deferredDelete() && !isInternal()) {
+                        boolean deleted = val == null;
 
-                            if (deleted != deletedUnlocked()) {
-                                deletedUnlocked(deleted);
+                        if (deleted != deletedUnlocked()) {
+                            deletedUnlocked(deleted);
 
-                                if (deleted)
-                                    enqueueVer = ver;
-                            }
+                            if (deleted)
+                                enqueueVer = ver;
                         }
+                    }
 
-                        recordDhtVersion(dhtVer);
+                    this.dhtVer = dhtVer;
 
-                        ret = true;
-                    }
+                    ret = true;
                 }
 
                 if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
@@ -647,6 +650,32 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
     }
 
     /**
+     * @throws GridCacheEntryRemovedException If entry was removed.
+     */
+    public synchronized void reserveEviction() throws GridCacheEntryRemovedException {
+        checkObsolete();
+
+        evictReservations++;
+    }
+
+    /**
+     *
+     */
+    public synchronized void releaseEviction() {
+        assert evictReservations > 0 : this;
+        assert !obsolete() : this;
+
+        evictReservations--;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean evictionDisabled() {
+        assert Thread.holdsLock(this);
+
+        return evictReservations > 0;
+    }
+
+    /**
      * @param nodeId Primary node ID.
      * @param topVer Topology version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index eca2f71..ae1d43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -24,7 +24,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -38,19 +37,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -59,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -66,83 +64,31 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
-import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
 
 /**
  *
  */
-public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
+public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Default max remap count value. */
-    public static final int DFLT_MAX_REMAP_CNT = 3;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
     /** Logger. */
     private static IgniteLogger log;
 
-    /** Maximum number of attempts to remap key to the same primary node. */
-    private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
-
-    /** Context. */
-    private final GridCacheContext<K, V> cctx;
-
-    /** Keys. */
-    private Collection<KeyCacheObject> keys;
-
-    /** Reload flag. */
-    private boolean reload;
-
-    /** Read through flag. */
-    private boolean readThrough;
-
-    /** Force primary flag. */
-    private boolean forcePrimary;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Version. */
-    private GridCacheVersion ver;
-
     /** Transaction. */
     private IgniteTxLocalEx tx;
 
-    /** Trackable flag. */
-    private boolean trackable;
-
-    /** Remap count. */
-    private AtomicInteger remapCnt = new AtomicInteger();
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name. */
-    private String taskName;
-
-    /** Whether to deserialize portable objects. */
-    private boolean deserializePortable;
-
-    /** Skip values flag. */
-    private boolean skipVals;
-
-    /** Expiry policy. */
-    private IgniteCacheExpiryPolicy expiryPlc;
-
-    /** Flag indicating that get should be done on a locked topology version. */
-    private final boolean canRemap;
+    /** */
+    private GridCacheVersion ver;
 
     /**
      * @param cctx Context.
      * @param keys Keys.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param forcePrimary If {@code true} get will be performed on primary node even if
      *      called on backup node.
      * @param tx Transaction.
@@ -151,12 +97,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObjects Keep cache objects flag.
      */
     public GridNearGetFuture(
         GridCacheContext<K, V> cctx,
         Collection<KeyCacheObject> keys,
         boolean readThrough,
-        boolean reload,
         boolean forcePrimary,
         @Nullable IgniteTxLocalEx tx,
         @Nullable UUID subjId,
@@ -164,24 +112,26 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean canRemap
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObjects
     ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
+        super(cctx,
+            keys,
+            readThrough,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            keepCacheObjects);
 
         assert !F.isEmpty(keys);
 
-        this.cctx = cctx;
-        this.keys = keys;
-        this.readThrough = readThrough;
-        this.reload = reload;
-        this.forcePrimary = forcePrimary;
         this.tx = tx;
-        this.subjId = subjId;
-        this.taskName = taskName;
-        this.deserializePortable = deserializePortable;
-        this.expiryPlc = expiryPlc;
-        this.skipVals = skipVals;
-        this.canRemap = canRemap;
 
         futId = IgniteUuid.randomUuid();
 
@@ -318,16 +268,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(affNodes.size());
 
-        Map<KeyCacheObject, GridCacheVersion> savedVers = null;
+        Map<KeyCacheObject, GridNearCacheEntry> savedEntries = null;
 
         // Assign keys to primary nodes.
         for (KeyCacheObject key : keys)
-            savedVers = map(key, mappings, topVer, mapped, savedVers);
+            savedEntries = map(key, mappings, topVer, mapped, savedEntries);
 
         if (isDone())
             return;
 
-        final Map<KeyCacheObject, GridCacheVersion> saved = savedVers;
+        final Map<KeyCacheObject, GridNearCacheEntry> saved = savedEntries != null ? savedEntries :
+            Collections.<KeyCacheObject, GridNearCacheEntry>emptyMap();
 
         final int keysSize = keys.size();
 
@@ -346,7 +297,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         -1,
                         mappedKeys,
                         readThrough,
-                        reload,
                         topVer,
                         subjId,
                         taskName == null ? 0 : taskName.hashCode(),
@@ -405,7 +355,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     ver,
                     mappedKeys,
                     readThrough,
-                    reload,
                     topVer,
                     subjId,
                     taskName == null ? 0 : taskName.hashCode(),
@@ -434,43 +383,64 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param key Key to map.
      * @param topVer Topology version
      * @param mapped Previously mapped.
-     * @param savedVers Saved versions.
+     * @param saved Reserved near cache entries.
      * @return Map.
      */
-    private Map<KeyCacheObject, GridCacheVersion> map(
+    private Map<KeyCacheObject, GridNearCacheEntry> map(
         KeyCacheObject key,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
         AffinityTopologyVersion topVer,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped,
-        Map<KeyCacheObject, GridCacheVersion> savedVers
+        Map<KeyCacheObject, GridNearCacheEntry> saved
     ) {
         final GridNearCacheAdapter near = cache();
 
         // Allow to get cached value from the local node.
         boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
 
-        GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null;
-
         while (true) {
+            GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null;
+
             try {
                 CacheObject v = null;
+                GridCacheVersion ver = null;
 
                 boolean isNear = entry != null;
 
                 // First we peek into near cache.
-                if (isNear)
-                    v = entry.innerGet(tx,
-                        /*swap*/false,
-                        /*read-through*/false,
-                        /*fail-fast*/true,
-                        /*unmarshal*/true,
-                        /*metrics*/true,
-                        /*events*/!skipVals,
-                        /*temporary*/false,
-                        subjId,
-                        null,
-                        taskName,
-                        expiryPlc);
+                if (isNear) {
+                    if (needVer) {
+                        T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                            null,
+                            /*swap*/true,
+                            /*unmarshal*/true,
+                            /**update-metrics*/true,
+                            /*event*/!skipVals,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc);
+
+                        if (res != null) {
+                            v = res.get1();
+                            ver = res.get2();
+                        }
+                    }
+                    else {
+                        v = entry.innerGet(tx,
+                            /*swap*/false,
+                            /*read-through*/false,
+                            /*fail-fast*/true,
+                            /*unmarshal*/true,
+                            /*metrics*/true,
+                            /*events*/!skipVals,
+                            /*temporary*/false,
+                            subjId,
+                            null,
+                            taskName,
+                            expiryPlc);
+                    }
+                }
 
                 ClusterNode affNode = null;
 
@@ -486,18 +456,37 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         if (dhtEntry != null) {
                             boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
 
-                            v = dhtEntry.innerGet(tx,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /*update-metrics*/false,
-                                /*events*/!isNear && !skipVals,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                expiryPlc);
+                            if (needVer) {
+                                T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                                    null,
+                                    /*swap*/true,
+                                    /*unmarshal*/true,
+                                    /**update-metrics*/false,
+                                    /*event*/!isNear && !skipVals,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+
+                                if (res != null) {
+                                    v = res.get1();
+                                    ver = res.get2();
+                                }
+                            }
+                            else {
+                                v = dhtEntry.innerGet(tx,
+                                    /*swap*/true,
+                                    /*read-through*/false,
+                                    /*fail-fast*/true,
+                                    /*unmarshal*/true,
+                                    /*update-metrics*/false,
+                                    /*events*/!isNear && !skipVals,
+                                    /*temporary*/false,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiryPlc);
+                            }
 
                             // Entry was not in memory or in swap, so we remove it from cache.
                             if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver))
@@ -515,7 +504,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                     "(all partition nodes left the grid)."));
 
-                                return savedVers;
+                                return saved;
                             }
 
                             if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
@@ -534,14 +523,29 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     }
                 }
 
-                if (v != null && !reload) {
-                    K key0 = key.value(cctx.cacheObjectContext(), true);
-                    V val0 = v.value(cctx.cacheObjectContext(), true);
+                if (v != null) {
+                    if (needVer) {
+                        V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+                        add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+                    }
+                    else {
+                        if (keepCacheObjects) {
+                            K key0 = (K)key;
+                            V val0 = (V)(skipVals ? true : v);
+
+                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+                        }
+                        else {
+                            K key0 = key.value(cctx.cacheObjectContext(), true);
+                            V val0 = v.value(cctx.cacheObjectContext(), true);
 
-                    val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
-                    key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
+                            val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
+                            key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
 
-                    add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+                            add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
+                        }
+                    }
                 }
                 else {
                     if (affNode == null) {
@@ -551,19 +555,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
                                 "(all partition nodes left the grid)."));
 
-                            return savedVers;
+                            return saved;
                         }
                     }
 
-                    GridNearCacheEntry nearEntry = allowLocRead ? near.peekExx(key) : null;
-
-                    entry = nearEntry;
-
-                    if (savedVers == null)
-                        savedVers = U.newHashMap(3);
-
-                    savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
-
                     LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
 
                     if (keys != null && keys.containsKey(key)) {
@@ -572,10 +567,23 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                                 MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
                                 "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
 
-                            return savedVers;
+                            return saved;
                         }
                     }
 
+                    if (!cctx.affinity().localNode(key, topVer)) {
+                        GridNearCacheEntry nearEntry = entry != null ? entry : near.entryExx(key, topVer);
+
+                        nearEntry.reserveEviction();
+
+                        entry = null;
+
+                        if (saved == null)
+                            saved = U.newHashMap(3);
+
+                        saved.put(key, nearEntry);
+                    }
+
                     // Don't add reader if transaction acquires lock anyway to avoid deadlock.
                     boolean addRdr = tx == null || tx.optimistic();
 
@@ -598,21 +606,15 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                 break;
             }
             catch (GridCacheEntryRemovedException ignored) {
-                entry = allowLocRead ? near.peekEx(key) : null;
-            }
-            catch (GridCacheFilterFailedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Filter validation failed for entry: " + e);
-
-                break;
+                // Retry.
             }
             finally {
-                if (entry != null && !reload && tx == null)
+                if (entry != null && tx == null)
                     cctx.evicts().touch(entry, topVer);
             }
         }
 
-        return savedVers;
+        return saved;
     }
 
     /**
@@ -655,7 +657,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
      * @param nodeId Node id.
      * @param keys Keys.
      * @param infos Entry infos.
-     * @param savedVers Saved versions.
+     * @param savedEntries Saved entries.
      * @param topVer Topology version
      * @return Result map.
      */
@@ -663,7 +665,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         UUID nodeId,
         Collection<KeyCacheObject> keys,
         Collection<GridCacheEntryInfo> infos,
-        Map<KeyCacheObject, GridCacheVersion> savedVers,
+        Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
         AffinityTopologyVersion topVer
     ) {
         boolean empty = F.isEmpty(keys);
@@ -681,9 +683,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
                     // Entries available locally in DHT should not be loaded into near cache for reading.
                     if (!cctx.affinity().localNode(info.key(), cctx.affinity().affinityTopologyVersion())) {
-                        GridNearCacheEntry entry = cache().entryExx(info.key(), topVer);
+                        GridNearCacheEntry entry = savedEntries.get(info.key());
 
-                        GridCacheVersion saved = savedVers.get(info.key());
+                        if (entry == null)
+                            entry = cache().entryExx(info.key(), topVer);
 
                         // Load entry into cache.
                         entry.loadedValue(tx,
@@ -691,14 +694,11 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                             info.value(),
                             atomic ? info.version() : ver,
                             info.version(),
-                            saved,
                             info.ttl(),
                             info.expireTime(),
                             true,
                             topVer,
                             subjId);
-
-                        cctx.evicts().touch(entry, topVer);
                     }
 
                     CacheObject val = info.value();
@@ -706,7 +706,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
 
                     assert skipVals == (info.value() == null);
 
-                    cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
+                    if (needVer)
+                        versionedResult(map, key, val, info.version());
+                    else
+                        cctx.addResult(map,
+                            key,
+                            val,
+                            skipVals,
+                            keepCacheObjects,
+                            deserializePortable,
+                            false);
                 }
                 catch (GridCacheEntryRemovedException ignore) {
                     if (log.isDebugEnabled())
@@ -724,6 +733,26 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         return map;
     }
 
+    /**
+     * @param keys Keys.
+     * @param saved Saved entries.
+     * @param topVer Topology version.
+     */
+    private void releaseEvictions(Collection<KeyCacheObject> keys,
+        Map<KeyCacheObject, GridNearCacheEntry> saved,
+        AffinityTopologyVersion topVer) {
+        for (KeyCacheObject key : keys) {
+            GridNearCacheEntry entry = saved.get(key);
+
+            if (entry != null) {
+                entry.releaseEviction();
+
+                if (tx == null)
+                    cctx.evicts().touch(entry, topVer);
+            }
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@@ -763,7 +792,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         private LinkedHashMap<KeyCacheObject, Boolean> keys;
 
         /** Saved entry versions. */
-        private Map<KeyCacheObject, GridCacheVersion> savedVers;
+        private Map<KeyCacheObject, GridNearCacheEntry> savedEntries;
 
         /** Topology version on which this future was mapped. */
         private AffinityTopologyVersion topVer;
@@ -774,18 +803,18 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
         /**
          * @param node Node.
          * @param keys Keys.
-         * @param savedVers Saved entry versions.
+         * @param savedEntries Saved entries.
          * @param topVer Topology version.
          */
         MiniFuture(
             ClusterNode node,
             LinkedHashMap<KeyCacheObject, Boolean> keys,
-            Map<KeyCacheObject, GridCacheVersion> savedVers,
+            Map<KeyCacheObject, GridNearCacheEntry> savedEntries,
             AffinityTopologyVersion topVer
         ) {
             this.node = node;
             this.keys = keys;
-            this.savedVers = savedVers;
+            this.savedEntries = savedEntries;
             this.topVer = topVer;
         }
 
@@ -821,6 +850,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
             onDone(e);
         }
 
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Map<K, V> res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                releaseEvictions(keys.keySet(), savedEntries, topVer);
+
+                return true;
+            }
+            else
+                return false;
+        }
+
         /**
          * @param e Topology exception.
          */
@@ -915,7 +955,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                     }), F.t(node, keys), topVer);
 
                     // It is critical to call onDone after adding futures to compound list.
-                    onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+                    onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
 
                     return;
                 }
@@ -935,12 +975,12 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
                         }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
 
                         // It is critical to call onDone after adding futures to compound list.
-                        onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+                        onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
                     }
                 });
             }
             else
-                onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer));
+                onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index ff6375a..8482217 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -109,7 +109,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
      * @param ver Version.
      * @param keys Keys.
      * @param readThrough Read through flag.
-     * @param reload Reload flag.
      * @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether
      *      cache entry has a value.
      * @param topVer Topology version.
@@ -125,7 +124,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
         GridCacheVersion ver,
         LinkedHashMap<KeyCacheObject, Boolean> keys,
         boolean readThrough,
-        boolean reload,
         @NotNull AffinityTopologyVersion topVer,
         UUID subjId,
         int taskNameHash,
@@ -145,7 +143,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep
         this.keys = keys.keySet();
         this.flags = keys.values();
         this.readThrough = readThrough;
-        this.reload = reload;
         this.topVer = topVer;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
new file mode 100644
index 0000000..47c1d21
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -0,0 +1,930 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+
+/**
+ *
+ */
+public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter
+    implements GridCacheMvccFuture<IgniteInternalTx> {
+    /** */
+    public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** */
+    @GridToStringExclude
+    private KeyLockFuture keyLockFut = new KeyLockFuture();
+
+    /** */
+    @GridToStringExclude
+    private ClientRemapFuture remapFut;
+
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     */
+    public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) {
+        super(cctx, tx);
+
+        assert tx.optimistic() && tx.serializable() : tx;
+
+        // Should wait for all mini futures completion before finishing tx.
+        ignoreChildFailures(IgniteCheckedException.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) {
+        if (log.isDebugEnabled())
+            log.debug("Transaction future received owner changed callback: " + entry);
+
+        if ((entry.context().isNear() || entry.context().isLocal()) && owner != null) {
+            IgniteTxEntry txEntry = tx.entry(entry.txKey());
+
+            if (txEntry != null) {
+                if (entry.context().isLocal()) {
+                    GridCacheVersion serReadVer = txEntry.serializableReadVersion();
+
+                    if (serReadVer != null) {
+                        GridCacheContext ctx = entry.context();
+
+                        while (true) {
+                            try {
+                                if (!entry.checkSerializableReadVersion(serReadVer)) {
+                                    Object key = entry.key().value(ctx.cacheObjectContext(), false);
+
+                                    IgniteTxOptimisticCheckedException err0 =
+                                        new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " +
+                                            "read/write conflict [key=" + key + ", cache=" + ctx.name() + ']');
+
+                                    err.compareAndSet(null, err0);
+                                }
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException e) {
+                                entry = ctx.cache().entryEx(entry.key());
+
+                                txEntry.cached(entry);
+                            }
+                        }
+
+                    }
+                }
+
+                keyLockFut.onKeyLocked(entry.txKey());
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<? extends ClusterNode> nodes() {
+        return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() {
+            @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) {
+                if (isMini(f))
+                    return ((MiniFuture)f).node();
+
+                return cctx.discovery().localNode();
+            }
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        boolean found = false;
+
+        for (IgniteInternalFuture<?> fut : futures()) {
+            if (isMini(fut)) {
+                MiniFuture f = (MiniFuture) fut;
+
+                if (f.node().id().equals(nodeId)) {
+                    ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+                        nodeId);
+
+                    e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                    f.onNodeLeft(e);
+
+                    found = true;
+                }
+            }
+        }
+
+        return found;
+    }
+
+    /**
+     * @param m Failed mapping.
+     * @param e Error.
+     */
+    private void onError(@Nullable GridDistributedTxMapping m, Throwable e) {
+        if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
+            if (tx.onePhaseCommit()) {
+                tx.markForBackupCheck();
+
+                onComplete();
+
+                return;
+            }
+        }
+
+        if (e instanceof IgniteTxOptimisticCheckedException) {
+            if (m != null)
+                tx.removeMapping(m.node().id());
+        }
+
+        err.compareAndSet(null, e);
+
+        keyLockFut.onDone(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+        if (!isDone()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.node().id().equals(nodeId);
+
+                        f.onResult(res);
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(IgniteInternalTx t, Throwable err) {
+        if (isDone())
+            return false;
+
+        if (err != null) {
+            this.err.compareAndSet(null, err);
+
+            keyLockFut.onDone(err);
+        }
+
+        return onComplete();
+    }
+
+    /**
+     * @param f Future.
+     * @return {@code True} if mini-future.
+     */
+    private boolean isMini(IgniteInternalFuture<?> f) {
+        return f.getClass().equals(MiniFuture.class);
+    }
+
+    /**
+     * Completeness callback.
+     *
+     * @return {@code True} if future was finished by this call.
+     */
+    private boolean onComplete() {
+        Throwable err0 = err.get();
+
+        if (err0 == null || tx.needCheckBackup())
+            tx.state(PREPARED);
+
+        if (super.onDone(tx, err0)) {
+            if (err0 != null)
+                tx.setRollbackOnly();
+
+            // Don't forget to clean up.
+            cctx.mvcc().removeFuture(this);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Initializes future.
+     *
+     * @param remap Remap flag.
+     */
+    @Override protected void prepare0(boolean remap, boolean topLocked) {
+        try {
+            boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
+
+            if (!txStateCheck) {
+                if (tx.setRollbackOnly()) {
+                    if (tx.timedOut())
+                        onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                            "was rolled back: " + this));
+                    else
+                        onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
+                            "[state=" + tx.state() + ", tx=" + this + ']'));
+                }
+                else
+                    onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                        "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+
+                return;
+            }
+
+            prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
+
+            markInitialized();
+        }
+        catch (IgniteCheckedException e) {
+            onDone(e);
+        }
+    }
+
+    /**
+     * @param reads Read entries.
+     * @param writes Write entries.
+     * @param remap Remap flag.
+     * @param topLocked Topology locked flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void prepare(
+        Iterable<IgniteTxEntry> reads,
+        Iterable<IgniteTxEntry> writes,
+        boolean remap,
+        boolean topLocked
+    ) throws IgniteCheckedException {
+        AffinityTopologyVersion topVer = tx.topologyVersion();
+
+        assert topVer.topologyVersion() > 0;
+
+        txMapping = new GridDhtTxMapping();
+
+        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
+                    onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
+                        "partition nodes left the grid): " + cacheCtx.name()));
+
+                    return;
+                }
+            }
+        }
+
+        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>();
+
+        for (IgniteTxEntry write : writes)
+            map(write, topVer, mappings, remap, topLocked);
+
+        for (IgniteTxEntry read : reads)
+            map(read, topVer, mappings, remap, topLocked);
+
+        keyLockFut.onAllKeysAdded();
+
+        if (!remap)
+            add(keyLockFut);
+
+        if (isDone()) {
+            if (log.isDebugEnabled())
+                log.debug("Abandoning (re)map because future is done: " + this);
+
+            return;
+        }
+
+        tx.addEntryMapping(mappings.values());
+
+        cctx.mvcc().recheckPendingLocks();
+
+        tx.transactionNodes(txMapping.transactionNodes());
+
+        checkOnePhase();
+
+        for (GridDistributedTxMapping m : mappings.values()) {
+            assert !m.empty();
+
+            MiniFuture fut = new MiniFuture(m);
+
+            add(fut);
+        }
+
+        Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+
+        Iterator<IgniteInternalFuture<?>> it = futs.iterator();
+
+        while (it.hasNext()) {
+            IgniteInternalFuture<?> fut0 = it.next();
+
+            if (skipFuture(remap, fut0))
+                continue;
+
+            MiniFuture fut = (MiniFuture)fut0;
+
+            IgniteCheckedException err = prepare(fut);
+
+            if (err != null) {
+                while (it.hasNext()) {
+                    fut0 = it.next();
+
+                    if (skipFuture(remap, fut0))
+                        continue;
+
+                    fut = (MiniFuture)fut0;
+
+                    tx.removeMapping(fut.mapping().node().id());
+
+                    fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err));
+                }
+
+                break;
+            }
+        }
+
+        markInitialized();
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param fut Future.
+     * @return {@code True} if skip future during remap.
+     */
+    private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) {
+        return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get());
+    }
+
+    /**
+     * @param fut Mini future.
+     * @return Prepare error if any.
+     */
+    @Nullable private IgniteCheckedException prepare(final MiniFuture fut) {
+        GridDistributedTxMapping m = fut.mapping();
+
+        final ClusterNode n = m.node();
+
+        GridNearTxPrepareRequest req = new GridNearTxPrepareRequest(
+            futId,
+            tx.topologyVersion(),
+            tx,
+            m.reads(),
+            m.writes(),
+            m.near(),
+            txMapping.transactionNodes(),
+            m.last(),
+            m.lastBackups(),
+            tx.onePhaseCommit(),
+            tx.needReturnValue() && tx.implicit(),
+            tx.implicitSingle(),
+            m.explicitLock(),
+            tx.subjectId(),
+            tx.taskNameHash(),
+            m.clientFirst(),
+            tx.activeCachesDeploymentEnabled());
+
+        for (IgniteTxEntry txEntry : m.writes()) {
+            if (txEntry.op() == TRANSFORM)
+                req.addDhtVersion(txEntry.txKey(), null);
+        }
+
+        // Must lock near entries separately.
+        if (m.near()) {
+            try {
+                tx.optimisticLockEntries(F.concat(false, m.writes(), m.reads()));
+
+                tx.userPrepare();
+            }
+            catch (IgniteCheckedException e) {
+                fut.onResult(e);
+
+                return e;
+            }
+        }
+
+        req.miniId(fut.futureId());
+
+        // If this is the primary node for the keys.
+        if (n.isLocal()) {
+            IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
+
+            prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() {
+                @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) {
+                    try {
+                        fut.onResult(prepFut.get());
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut.onResult(e);
+                    }
+                }
+            });
+        }
+        else {
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
+            }
+            catch (ClusterTopologyCheckedException e) {
+                e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+                fut.onNodeLeft(e);
+
+                return e;
+            }
+            catch (IgniteCheckedException e) {
+                fut.onResult(e);
+
+                return e;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * @param entry Transaction entry.
+     * @param topVer Topology version.
+     * @param curMapping Current mapping.
+     * @param remap Remap flag.
+     * @param topLocked Topology locked flag.
+     */
+    private void map(
+        IgniteTxEntry entry,
+        AffinityTopologyVersion topVer,
+        Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping,
+        boolean remap,
+        boolean topLocked
+    ) {
+        GridCacheContext cacheCtx = entry.context();
+
+        List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+
+        txMapping.addMapping(nodes);
+
+        ClusterNode primary = F.first(nodes);
+
+        assert primary != null;
+
+        if (log.isDebugEnabled()) {
+            log.debug("Mapped key to primary node [key=" + entry.key() +
+                ", part=" + cacheCtx.affinity().partition(entry.key()) +
+                ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']');
+        }
+
+        if (primary.version().compareTo(SER_TX_SINCE) < 0) {
+            onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " +
+                "version starting from " + SER_TX_SINCE));
+
+            return;
+        }
+
+        // Must re-initialize cached entry while holding topology lock.
+        if (cacheCtx.isNear())
+            entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
+        else if (!cacheCtx.isLocal())
+            entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
+        else
+            entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
+
+        if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) {
+            if (entry.explicitVersion() == null)
+                keyLockFut.addLockKey(entry.txKey());
+        }
+
+        IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear());
+
+        GridDistributedTxMapping cur = curMapping.get(key);
+
+        if (cur == null) {
+            cur = new GridDistributedTxMapping(primary);
+
+            curMapping.put(key, cur);
+
+            if (primary.isLocal()) {
+                if (entry.context().isNear())
+                    tx.nearLocallyMapped(true);
+                else if (entry.context().isColocated())
+                    tx.colocatedLocallyMapped(true);
+            }
+
+            // Initialize near flag right away.
+            cur.near(cacheCtx.isNear());
+
+            cur.clientFirst(!topLocked && cctx.kernalContext().clientNode());
+
+            cur.last(true);
+        }
+
+        cur.add(entry);
+
+        if (entry.explicitVersion() != null) {
+            tx.markExplicit(primary.id());
+
+            cur.markExplicitLock();
+        }
+
+        entry.nodeId(primary.id());
+
+        if (cacheCtx.isNear()) {
+            while (true) {
+                try {
+                    GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached();
+
+                    cached.dhtNodeId(tx.xidVersion(), primary.id());
+
+                    break;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    entry.cached(cacheCtx.near().entryEx(entry.key()));
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        Collection<String> futs = F.viewReadOnly(futures(),
+            new C1<IgniteInternalFuture<?>, String>() {
+                @Override public String apply(IgniteInternalFuture<?> f) {
+                    return "[node=" + ((MiniFuture)f).node().id() +
+                        ", loc=" + ((MiniFuture)f).node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+            },
+            new P1<IgniteInternalFuture<?>>() {
+                @Override public boolean apply(IgniteInternalFuture<?> f) {
+                    return isMini(f);
+                }
+            });
+
+        return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this,
+            "innerFuts", futs,
+            "keyLockFut", keyLockFut,
+            "tx", tx,
+            "super", super.toString());
+    }
+
+    /**
+     *
+     */
+    private class ClientRemapFuture extends GridCompoundFuture<GridNearTxPrepareResponse, Boolean> {
+        /** */
+        private boolean remap = true;
+
+        /**
+         *
+         */
+        public ClientRemapFuture() {
+            super();
+
+            reducer(new IgniteReducer<GridNearTxPrepareResponse, Boolean>() {
+                @Override public boolean collect(GridNearTxPrepareResponse res) {
+                    assert res != null;
+
+                    if (res.clientRemapVersion() == null)
+                        remap = false;
+
+                    return true;
+                }
+
+                @Override public Boolean reduce() {
+                    return remap;
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /** Keys. */
+        @GridToStringInclude
+        private GridDistributedTxMapping m;
+
+        /** Flag to signal some result being processed. */
+        private AtomicBoolean rcvRes = new AtomicBoolean(false);
+
+        /**
+         * @param m Mapping.
+         */
+        MiniFuture(GridDistributedTxMapping m) {
+            this.m = m;
+        }
+
+        /**
+         * @return Future ID.
+         */
+        IgniteUuid futureId() {
+            return futId;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public ClusterNode node() {
+            return m.node();
+        }
+
+        /**
+         * @return Keys.
+         */
+        public GridDistributedTxMapping mapping() {
+            return m;
+        }
+
+        /**
+         * @param e Error.
+         */
+        void onResult(Throwable e) {
+            if (rcvRes.compareAndSet(false, true)) {
+                onError(m, e);
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
+
+                // Fail.
+                onDone(e);
+            }
+            else
+                U.warn(log, "Received error after another result has been processed [fut=" +
+                    GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e);
+        }
+
+        /**
+         * @param e Node failure.
+         */
+        void onNodeLeft(ClusterTopologyCheckedException e) {
+            if (isDone())
+                return;
+
+            if (rcvRes.compareAndSet(false, true)) {
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this);
+
+                onError(null, e);
+
+                onDone(e);
+            }
+        }
+
+        /**
+         * @param res Result callback.
+         */
+        @SuppressWarnings("unchecked")
+        void onResult(final GridNearTxPrepareResponse res) {
+            if (isDone())
+                return;
+
+            if (rcvRes.compareAndSet(false, true)) {
+                if (res.error() != null) {
+                    // Fail the whole compound future.
+                    onError(m, res.error());
+
+                    onDone(res.error());
+                }
+                else {
+                    if (res.clientRemapVersion() != null) {
+                        assert cctx.kernalContext().clientNode();
+                        assert m.clientFirst();
+
+                        tx.removeMapping(m.node().id());
+
+                        ClientRemapFuture remapFut0 = null;
+
+                        synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+                            if (remapFut == null) {
+                                remapFut = new ClientRemapFuture();
+
+                                remapFut0 = remapFut;
+                            }
+                        }
+
+                        if (remapFut0 != null) {
+                            Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
+
+                            for (IgniteInternalFuture<?> fut : futs) {
+                                if (isMini(fut) && fut != this)
+                                    remapFut0.add((MiniFuture)fut);
+                            }
+
+                            remapFut0.markInitialized();
+
+                            remapFut0.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                                @Override public void apply(IgniteInternalFuture<Boolean> remapFut0) {
+                                    try {
+                                        IgniteInternalFuture<?> affFut =
+                                            cctx.exchange().affinityReadyFuture(res.clientRemapVersion());
+
+                                        if (affFut == null)
+                                            affFut = new GridFinishedFuture<Object>();
+
+                                        if (remapFut.get()) {
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Will remap client tx [" +
+                                                    "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this +
+                                                    ", topVer=" + res.topologyVersion() + ']');
+                                            }
+
+                                            synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
+                                                assert remapFut0 == remapFut;
+
+                                                remapFut = null;
+                                            }
+
+                                            affFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                                @Override public void apply(IgniteInternalFuture<?> affFut) {
+                                                    try {
+                                                        affFut.get();
+
+                                                        remap(res);
+                                                    }
+                                                    catch (IgniteCheckedException e) {
+                                                        onDone(e);
+                                                    }
+                                                }
+                                            });
+                                        }
+                                        else {
+                                            ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+                                                "Cluster topology changed while client transaction is preparing.");
+
+                                            err.retryReadyFuture(affFut);
+
+                                            onDone(err);
+                                        }
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("Prepare failed, will not remap tx: " +
+                                                GridNearOptimisticSerializableTxPrepareFuture.this);
+                                        }
+
+                                        onDone(e);
+                                    }
+                                }
+                            });
+                        }
+                        else
+                            onDone(res);
+                    }
+                    else {
+                        onPrepareResponse(m, res);
+
+                        // Finish this mini future (need result only on client node).
+                        onDone(cctx.kernalContext().clientNode() ? res : null);
+                    }
+                }
+            }
+        }
+
+        /**
+         * @param res Response.
+         */
+        private void remap(final GridNearTxPrepareResponse res) {
+            prepareOnTopology(true, new Runnable() {
+                @Override public void run() {
+                    onDone(res);
+                }
+            });
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+        }
+    }
+
+    /**
+     * Keys lock future.
+     */
+    private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
+        /** */
+        @GridToStringInclude
+        private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+
+        /** */
+        private volatile boolean allKeysAdded;
+
+        /**
+         * @param key Key to track for locking.
+         */
+        private void addLockKey(IgniteTxKey key) {
+            assert !allKeysAdded;
+
+            lockKeys.add(key);
+        }
+
+        /**
+         * @param key Locked keys.
+         */
+        private void onKeyLocked(IgniteTxKey key) {
+            lockKeys.remove(key);
+
+            checkLocks();
+        }
+
+        /**
+         * Moves future to the ready state.
+         */
+        private void onAllKeysAdded() {
+            allKeysAdded = true;
+
+            checkLocks();
+        }
+
+        /**
+         * @return {@code True} if all locks are owned.
+         */
+        private boolean checkLocks() {
+            boolean locked = lockKeys.isEmpty();
+
+            if (locked && allKeysAdded) {
+                if (log.isDebugEnabled())
+                    log.debug("All locks are acquired for near prepare future: " + this);
+
+                onDone((GridNearTxPrepareResponse)null);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']');
+            }
+
+            return locked;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(KeyLockFuture.class, this, super.toString());
+        }
+    }
+}


Mime
View raw message