ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [4/4] ignite git commit: Merge branches 'ignite-843' and 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-843
Date Mon, 24 Aug 2015 02:41:02 GMT
Merge branches 'ignite-843' and 'master' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-843


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b9cd1844
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b9cd1844
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b9cd1844

Branch: refs/heads/ignite-843
Commit: b9cd18447c7636bbaa4fe0f78fb05e73f56b01e1
Parents: 6b7593d
Author: Alexey Kuznetsov <akuznetsov@apache.org>
Authored: Mon Aug 24 09:41:02 2015 +0700
Committer: Alexey Kuznetsov <akuznetsov@apache.org>
Committed: Mon Aug 24 09:41:04 2015 +0700

----------------------------------------------------------------------
 examples/config/example-cache.xml               |   2 -
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../store/jdbc/CacheAbstractJdbcStore.java      |  45 +-
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |  32 +-
 .../store/jdbc/dialect/BasicJdbcDialect.java    |   3 +
 .../cache/store/jdbc/dialect/DB2Dialect.java    |   3 +
 .../cache/store/jdbc/dialect/H2Dialect.java     |   3 +
 .../cache/store/jdbc/dialect/JdbcDialect.java   |   3 +-
 .../cache/store/jdbc/dialect/MySQLDialect.java  |   3 +
 .../cache/store/jdbc/dialect/OracleDialect.java |   3 +
 .../store/jdbc/dialect/SQLServerDialect.java    |   3 +
 .../cluster/ClusterTopologyException.java       |  18 +
 .../ignite/internal/MarshallerContextImpl.java  |  24 +-
 .../ClusterTopologyCheckedException.java        |  18 +
 .../CachePartialUpdateCheckedException.java     |  11 +-
 .../processors/cache/GridCacheAdapter.java      |  85 ++-
 .../cache/GridCacheEvictionManager.java         |   2 +-
 .../processors/cache/GridCacheIoManager.java    |   1 -
 .../cache/GridCacheSharedContext.java           |  17 +
 .../processors/cache/GridCacheUtils.java        |  23 +
 .../distributed/GridDistributedCacheEntry.java  |  11 +-
 .../dht/GridClientPartitionTopology.java        |  20 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  12 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |  12 +-
 .../dht/GridDhtPartitionTopology.java           |   7 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  20 +
 .../cache/distributed/dht/GridDhtTxLocal.java   |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 182 ++++++-
 .../dht/GridDhtTxPrepareResponse.java           |  42 +-
 .../dht/GridPartitionedGetFuture.java           | 104 ++--
 .../dht/atomic/GridDhtAtomicCache.java          |  16 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   3 +
 .../dht/colocated/GridDhtColocatedCache.java    |  19 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   8 +-
 .../GridDhtPartitionsExchangeFuture.java        |  40 +-
 .../distributed/near/GridNearAtomicCache.java   |   6 +-
 .../distributed/near/GridNearCacheAdapter.java  |  15 +-
 .../distributed/near/GridNearCacheEntry.java    |  10 +-
 .../distributed/near/GridNearGetFuture.java     | 120 +++--
 .../distributed/near/GridNearLockFuture.java    |  12 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  13 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   9 +-
 .../near/GridNearTransactionalCache.java        |   9 +-
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../near/GridNearTxPrepareResponse.java         |   3 -
 .../cache/local/GridLocalCacheEntry.java        |   4 +-
 .../local/atomic/GridLocalAtomicCache.java      |  17 +-
 .../cache/transactions/IgniteInternalTx.java    |   2 +-
 .../cache/transactions/IgniteTxAdapter.java     |  19 +-
 .../cache/transactions/IgniteTxEntry.java       |  18 +
 .../cache/transactions/IgniteTxHandler.java     |   5 +-
 .../transactions/IgniteTxLocalAdapter.java      |   4 +-
 .../service/GridServiceProcessor.java           |   5 +
 .../ignite/internal/util/IgniteUtils.java       |  10 +-
 .../ignite/internal/util/lang/GridFunc.java     |  14 +
 .../TcpDiscoveryMulticastIpFinder.java          |  38 ++
 .../config/store/jdbc/ignite-type-metadata.xml  |   8 +
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |  33 +-
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |  16 +-
 .../ignite/cache/store/jdbc/model/Person.java   |  26 +-
 .../cache/CrossCacheTxRandomOperationsTest.java | 534 +++++++++++++++++++
 ...teAtomicCacheEntryProcessorNodeJoinTest.java |  32 ++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  | 225 ++++++++
 .../IgniteCacheTopologySafeGetSelfTest.java     | 218 ++++++++
 .../GridCacheMultiNodeLockAbstractTest.java     |  41 +-
 .../GridCacheTransformEventSelfTest.java        |   2 +
 .../IgniteCacheCrossCacheTxFailoverTest.java    | 433 +++++++++++++++
 .../IgniteCachePutRetryAbstractSelfTest.java    |   1 +
 ...gniteCachePutRetryTransactionalSelfTest.java | 187 +++++++
 .../near/GridCacheNearOnlyTopologySelfTest.java |   4 +-
 .../near/GridCacheNearTxForceKeyTest.java       |  76 +++
 ...idCachePartitionedHitsAndMissesSelfTest.java |  20 +-
 ...idCachePartitionedMultiNodeLockSelfTest.java |   8 +-
 ...ridCacheReplicatedMultiNodeLockSelfTest.java |   8 +-
 .../lru/LruNearEvictionPolicySelfTest.java      |  29 +-
 .../LruNearOnlyNearEvictionPolicySelfTest.java  |  55 +-
 .../OptimizedMarshallerNodeFailoverTest.java    |  97 +++-
 .../IgniteCacheFailoverTestSuite.java           |   2 +
 .../IgniteCacheFailoverTestSuite2.java          |   2 +
 .../testsuites/IgniteCacheTestSuite2.java       |   5 +
 .../ignite/schema/parser/DbMetadataReader.java  |  41 +-
 .../parser/dialect/DB2MetadataDialect.java      |   3 +-
 .../parser/dialect/DatabaseMetadataDialect.java |  13 +-
 .../parser/dialect/JdbcMetadataDialect.java     | 129 +++--
 .../parser/dialect/MySQLMetadataDialect.java    |  57 ++
 .../parser/dialect/OracleMetadataDialect.java   | 111 ++--
 .../ignite/schema/model/PojoDescriptor.java     |   6 +-
 .../ignite/schema/model/SchemaDescriptor.java   |  61 +++
 .../schema/parser/DatabaseMetadataParser.java   |  26 +-
 .../org/apache/ignite/schema/ui/Controls.java   |  25 +-
 .../ignite/schema/ui/SchemaImportApp.java       | 157 +++++-
 .../schema/test/AbstractSchemaImportTest.java   |  10 +-
 .../jdbc/CacheJdbcPojoStoreFactorySelfTest.java |  14 +-
 93 files changed, 3437 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/examples/config/example-cache.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-cache.xml b/examples/config/example-cache.xml
index 6d1d0da..98e1a71 100644
--- a/examples/config/example-cache.xml
+++ b/examples/config/example-cache.xml
@@ -37,8 +37,6 @@
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">
     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="localHost" value="127.0.0.1" />
-
         <property name="cacheConfiguration">
             <list>
                 <!-- Partitioned cache example configuration (Atomic mode). -->

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7e96b29..7c808df 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -354,6 +354,9 @@ public final class IgniteSystemProperties {
     /** Number of cache operation retries in case of topology exceptions. */
     public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
 
+    /** Number of times pending cache objects will be dumped to the log in case of partition exchange timeout. */
+    public static final String IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD = "IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index b1e223b..b2be8c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -355,6 +355,11 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
      * @throws SQLException If a database access error occurs or this method is called.
      */
     protected Object getColumnValue(ResultSet rs, int colIdx, Class<?> type) throws SQLException {
+        Object val = rs.getObject(colIdx);
+
+        if (val == null)
+            return null;
+
         if (type == int.class)
             return rs.getInt(colIdx);
 
@@ -364,7 +369,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         if (type == double.class)
             return rs.getDouble(colIdx);
 
-        if (type == boolean.class)
+        if (type == boolean.class || type == Boolean.class)
             return rs.getBoolean(colIdx);
 
         if (type == byte.class)
@@ -378,31 +383,23 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
 
         if (type == Integer.class || type == Long.class || type == Double.class ||
             type == Byte.class || type == Short.class ||  type == Float.class) {
-            Object val = rs.getObject(colIdx);
-
-            if (val != null) {
-                Number num = (Number)val;
-
-                if (type == Integer.class)
-                    return num.intValue();
-                else if (type == Long.class)
-                    return num.longValue();
-                else if (type == Double.class)
-                    return num.doubleValue();
-                else if (type == Byte.class)
-                    return num.byteValue();
-                else if (type == Short.class)
-                    return num.shortValue();
-                else if (type == Float.class)
-                    return num.floatValue();
-            }
-            else
-                return EMPTY_COLUMN_VALUE;
+            Number num = (Number)val;
+
+            if (type == Integer.class)
+                return num.intValue();
+            else if (type == Long.class)
+                return num.longValue();
+            else if (type == Double.class)
+                return num.doubleValue();
+            else if (type == Byte.class)
+                return num.byteValue();
+            else if (type == Short.class)
+                return num.shortValue();
+            else if (type == Float.class)
+                return num.floatValue();
         }
 
-        Object val = rs.getObject(colIdx);
-
-        if (type == UUID.class && val != null) {
+        if (type == UUID.class) {
             if (val instanceof UUID)
                 return val;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
index 7b78bda..1ff170e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStore.java
@@ -99,8 +99,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
                         getters.put(field.getJavaName(), cls.getMethod("is" + prop));
                     }
                     catch (NoSuchMethodException e) {
-                        throw new CacheException("Failed to find getter in POJO class [class name=" + clsName +
-                            ", property=" + field.getJavaName() + "]", e);
+                        throw new CacheException("Failed to find getter in POJO class [clsName=" + clsName +
+                            ", prop=" + field.getJavaName() + "]", e);
                     }
                 }
 
@@ -108,8 +108,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
                     setters.put(field.getJavaName(), cls.getMethod("set" + prop, field.getJavaType()));
                 }
                 catch (NoSuchMethodException e) {
-                    throw new CacheException("Failed to find setter in POJO class [class name=" + clsName +
-                        ", property=" + field.getJavaName() + "]", e);
+                    throw new CacheException("Failed to find setter in POJO class [clsName=" + clsName +
+                        ", prop=" + field.getJavaName() + "]", e);
                 }
             }
         }
@@ -167,15 +167,25 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
             Object obj = mc.ctor.newInstance();
 
             for (CacheTypeFieldMetadata field : fields) {
-                Method setter = mc.setters.get(field.getJavaName());
+                String fldJavaName = field.getJavaName();
+
+                Method setter = mc.setters.get(fldJavaName);
 
                 if (setter == null)
-                    throw new CacheLoaderException("Failed to find setter in POJO class [class name=" + typeName +
-                        ", property=" + field.getJavaName() + "]");
+                    throw new IllegalStateException("Failed to find setter in POJO class [clsName=" + typeName +
+                        ", prop=" + fldJavaName + "]");
+
+                String fldDbName = field.getDatabaseName();
 
-                Integer colIdx = loadColIdxs.get(field.getDatabaseName());
+                Integer colIdx = loadColIdxs.get(fldDbName);
 
-                setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+                try {
+                    setter.invoke(obj, getColumnValue(rs, colIdx, field.getJavaType()));
+                }
+                catch (Exception e) {
+                    throw new IllegalStateException("Failed to set property in POJO class [clsName=" + typeName +
+                        ", prop=" + fldJavaName + ", col=" + colIdx + ", dbName=" + fldDbName + "]", e);
+                }
             }
 
             return (R)obj;
@@ -204,8 +214,8 @@ public class CacheJdbcPojoStore<K, V> extends CacheAbstractJdbcStore<K, V> {
             Method getter = mc.getters.get(fieldName);
 
             if (getter == null)
-                throw new CacheLoaderException("Failed to find getter in POJO class [class name=" + typeName +
-                    ", property=" + fieldName + "]");
+                throw new CacheLoaderException("Failed to find getter in POJO class [clsName=" + typeName +
+                    ", prop=" + fieldName + "]");
 
             return getter.invoke(obj);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
index d0dd6f4..b43c7d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java
@@ -26,6 +26,9 @@ import java.util.*;
  * Basic implementation of dialect based on JDBC specification.
  */
 public class BasicJdbcDialect implements JdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** Default max query parameters count. */
     protected static final int DFLT_MAX_PARAMS_CNT = 2000;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
index fe1d876..2a08557 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/DB2Dialect.java
@@ -25,6 +25,9 @@ import java.util.*;
  * A dialect compatible with the IBM DB2 database.
  */
 public class DB2Dialect extends BasicJdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
index a97e144..8091e1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/H2Dialect.java
@@ -25,6 +25,9 @@ import java.util.*;
  * A dialect compatible with the H2 database.
  */
 public class H2Dialect extends BasicJdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
index be1cc67..32adcc4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/JdbcDialect.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.cache.store.jdbc.dialect;
 
+import java.io.*;
 import java.util.*;
 
 /**
  * Represents a dialect of SQL implemented by a particular RDBMS.
  */
-public interface JdbcDialect {
+public interface JdbcDialect extends Serializable {
     /**
      * Construct select count query.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
index df16841..def2fe7 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/MySQLDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
  * A dialect compatible with the MySQL database.
  */
 public class MySQLDialect extends BasicJdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
index 351f10a..e155fb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/OracleDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
  * A dialect compatible with the Oracle database.
  */
 public class OracleDialect extends BasicJdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
index e781e98..7fdda6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/SQLServerDialect.java
@@ -25,6 +25,9 @@ import java.util.*;
  * A dialect compatible with the Microsoft SQL Server database.
  */
 public class SQLServerDialect extends BasicJdbcDialect {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public boolean hasMerge() {
         return true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
index d28c409..61bc367 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Retry ready future. */
+    private transient IgniteFuture<?> readyFut;
+
     /**
      * Creates new topology exception with given error message.
      *
@@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException {
     public ClusterTopologyException(String msg, @Nullable Throwable cause) {
         super(msg, cause);
     }
+
+    /**
+     * @return Retry ready future.
+     */
+    public IgniteFuture<?> retryReadyFuture() {
+        return readyFut;
+    }
+
+    /**
+     * @param readyFut Retry ready future.
+     */
+    public void retryReadyFuture(IgniteFuture<?> readyFut) {
+        this.readyFut = readyFut;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 9f7c983..dc0fd57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.*;
 
@@ -135,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
                 throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping).");
         }
 
-        String clsName = cache0.get(id);
+        String clsName = cache0.getTopologySafe(id);
 
         if (clsName == null) {
             File file = new File(workDir, id + ".classname");
@@ -177,18 +178,21 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
             throws CacheEntryListenerException {
             for (CacheEntryEvent<? extends Integer, ? extends String> evt : events) {
-                assert evt.getOldValue() == null : "Received non-null old value for system marshaller cache: " + evt;
+                assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()):
+                    "Received cache entry update for system marshaller cache: " + evt;
 
-                File file = new File(workDir, evt.getKey() + ".classname");
+                if (evt.getOldValue() == null) {
+                    File file = new File(workDir, evt.getKey() + ".classname");
 
-                try (Writer writer = new FileWriter(file)) {
-                    writer.write(evt.getValue());
+                    try (Writer writer = new FileWriter(file)) {
+                        writer.write(evt.getValue());
 
-                    writer.flush();
-                }
-                catch (IOException e) {
-                    U.error(log, "Failed to write class name to file [id=" + evt.getKey() +
-                        ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e);
+                        writer.flush();
+                    }
+                    catch (IOException e) {
+                        U.error(log, "Failed to write class name to file [id=" + evt.getKey() +
+                            ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
index 8f985b4..2d7b0de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.cluster;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.jetbrains.annotations.*;
 
 /**
@@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Next topology version to wait. */
+    private transient IgniteInternalFuture<?> readyFut;
+
     /**
      * Creates new topology exception with given error message.
      *
@@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
     public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) {
         super(msg, cause);
     }
+
+    /**
+     * @return Retry ready future.
+     */
+    public IgniteInternalFuture<?> retryReadyFuture() {
+        return readyFut;
+    }
+
+    /**
+     * @param readyFut Retry ready future.
+     */
+    public void retryReadyFuture(IgniteInternalFuture<?> readyFut) {
+        this.readyFut = readyFut;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
index c2259df..ab38e5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartialUpdateCheckedException.java
@@ -47,8 +47,9 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * Gets collection of failed keys.
      * @return Collection of failed keys.
      */
-    public <K> Collection<K> failedKeys() {
-        return (Collection<K>)failedKeys;
+    @SuppressWarnings("unchecked")
+    public synchronized <K> Collection<K> failedKeys() {
+        return new LinkedHashSet<>((Collection<K>)failedKeys);
     }
 
     /**
@@ -56,7 +57,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * @param err Error.
      * @param topVer Topology version for failed update.
      */
-    public void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
+    public synchronized void add(Collection<?> failedKeys, Throwable err, AffinityTopologyVersion topVer) {
         if (topVer != null) {
             AffinityTopologyVersion topVer0 = this.topVer;
 
@@ -72,7 +73,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
     /**
      * @return Topology version.
      */
-    public AffinityTopologyVersion topologyVersion() {
+    public synchronized AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
@@ -80,7 +81,7 @@ public class CachePartialUpdateCheckedException extends IgniteCheckedException {
      * @param failedKeys Failed keys.
      * @param err Error.
      */
-    public void add(Collection<?> failedKeys, Throwable err) {
+    public synchronized void add(Collection<?> failedKeys, Throwable err) {
         add(failedKeys, err, null);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 177dcfb..7adea2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -526,7 +526,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*subj id*/null,
             /*task name*/null,
             /*deserialize portable*/false,
-            /*skip values*/true
+            /*skip values*/true,
+            /*can remap*/true
         ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
             @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
                 Map<K, V> map = fut.get();
@@ -560,7 +561,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             /*subj id*/null,
             /*task name*/null,
             /*deserialize portable*/false,
-            /*skip values*/true
+            /*skip values*/true,
+            /*can remap*/true
         ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
             @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
                 Map<K, V> kvMap = fut.get();
@@ -894,7 +896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[]) null);
+        return entrySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
@@ -919,12 +921,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[]) null);
+        return primaryKeySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[]) null);
+        return values((CacheEntryPredicate[])null);
     }
 
     /**
@@ -1210,22 +1212,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     @Override public V getForcePrimary(K key) throws IgniteCheckedException {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
-        return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false)
-            .get().get(key);
+        return getAllAsync(
+            F.asList(key),
+            /*force primary*/true,
+            /*skip tx*/false,
+            /*cached entry*/null,
+            /*subject id*/null,
+            taskName,
+            /*deserialize cache objects*/true,
+            /*skip values*/false,
+            /*can remap*/true
+        ).get().get(key);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
-        return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
-            taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+        return getAllAsync(
+            Collections.singletonList(key),
+            /*force primary*/true,
+            /*skip tx*/false,
+            null,
+            null,
+            taskName,
+            true,
+            false,
+            /*can remap*/true
+        ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
             @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
                 return e.get().get(key);
             }
         });
     }
 
+    /**
+     * Gets value without waiting for toplogy changes.
+     *
+     * @param key Key.
+     * @return Value.
+     * @throws IgniteCheckedException If failed.
+     */
+    public V getTopologySafe(K key) throws IgniteCheckedException {
+        String taskName = ctx.kernalContext().job().currentTaskName();
+
+        return getAllAsync(
+            F.asList(key),
+            /*force primary*/false,
+            /*skip tx*/false,
+            /*cached entry*/null,
+            /*subject id*/null,
+            taskName,
+            /*deserialize cache objects*/true,
+            /*skip values*/false,
+            /*can remap*/false
+        ).get().get(key);
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
         return getAllOutTxAsync(keys).get();
@@ -1242,7 +1285,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             null,
             taskName,
             !ctx.keepPortable(),
-            false);
+            /*skip values*/false,
+            /*can remap*/true);
     }
 
     /**
@@ -1577,7 +1621,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -1592,7 +1637,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             deserializePortable,
             forcePrimary,
             skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /**
@@ -1618,7 +1664,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean deserializePortable,
         final boolean forcePrimary,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        final boolean skipVals
+        final boolean skipVals,
+        boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -1633,7 +1680,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             deserializePortable,
             expiry,
             skipVals,
-            false);
+            false,
+            canRemap);
     }
 
     /**
@@ -1656,7 +1704,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         final boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiry,
         final boolean skipVals,
-        final boolean keepCacheObjects
+        final boolean keepCacheObjects,
+        boolean canRemap
         ) {
         if (F.isEmpty(keys))
             return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1679,7 +1728,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 assert keys != null;
 
                 final AffinityTopologyVersion topVer = tx == null
-                    ? ctx.affinity().affinityTopologyVersion()
+                    ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
                     : tx.topologyVersion();
 
                 final Map<K1, V1> map = new GridLeanMap<>(keys.size());
@@ -4465,7 +4514,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             null,
             taskName,
             deserializePortable,
-            false);
+            false,
+            /*can remap*/true);
     }
 
     /**
@@ -4693,6 +4743,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
 
         /**
+         *
          */
         public void execute() {
             tx = ctx.tm().newTx(

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index 55669a7..fd71ab5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -134,7 +134,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
         memoryMode = cctx.config().getMemoryMode();
 
-        plcEnabled = plc != null && memoryMode != OFFHEAP_TIERED;
+        plcEnabled = plc != null && (cctx.isNear() || memoryMode != OFFHEAP_TIERED);
 
         filter = cfg.getEvictionFilter();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..0ef190e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -493,7 +493,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     req.version(),
                     null,
                     null,
-                    null,
                     null);
 
                 res.error(req.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 4075d79..cc661d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -531,6 +531,23 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * Gets ready future for the next affinity topology version (used in cases when a node leaves grid).
+     *
+     * @param curVer Current topology version (before a node left grid).
+     * @return Ready future.
+     */
+    public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) {
+        if (curVer == null)
+            return null;
+
+        AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1);
+
+        IgniteInternalFuture<?> fut = exchMgr.affinityReadyFuture(nextVer);
+
+        return fut == null ? new GridFinishedFuture<>() : fut;
+    }
+
+    /**
      * @param tx Transaction to check.
      * @param activeCacheIds Active cache IDs.
      * @param cacheCtx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index a313e3d..bf55f59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1674,6 +1674,29 @@ public class GridCacheUtils {
     }
 
     /**
+     * @param partsMap Cache ID to partition IDs collection map.
+     * @return Cache ID to partition ID array map.
+     */
+    public static Map<Integer, int[]> convertInvalidPartitions(Map<Integer, Set<Integer>> partsMap) {
+        Map<Integer, int[]> res = new HashMap<>(partsMap.size());
+
+        for (Map.Entry<Integer, Set<Integer>> entry : partsMap.entrySet()) {
+            Set<Integer> parts = entry.getValue();
+
+            int[] partsArray = new int[parts.size()];
+
+            int idx = 0;
+
+            for (Integer part : parts)
+                partsArray[idx++] = part;
+
+            res.put(entry.getKey(), partsArray);
+        }
+
+        return res;
+    }
+
+    /**
      * Stops store session listeners.
      *
      * @param ctx Kernal context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index bd72764..59d75be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.distributed;
 
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
@@ -68,6 +69,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
      *
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param topVer Topology version.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Transaction flag.
@@ -78,6 +80,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     @Nullable public GridCacheMvccCandidate addLocal(
         long threadId,
         GridCacheVersion ver,
+        AffinityTopologyVersion topVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -105,6 +108,9 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
             cand = mvcc.addLocal(this, threadId, ver, timeout, reenter, tx, implicitSingle);
 
+            if (cand != null)
+                cand.topologyVersion(topVer);
+
             owner = mvcc.anyOwner();
 
             boolean emptyAfter = mvcc.isEmpty();
@@ -732,6 +738,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
             return addLocal(
                 tx.threadId(),
                 tx.xidVersion(),
+                tx.topologyVersion(),
                 timeout,
                 false,
                 true,
@@ -828,8 +835,10 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
 
                 // Allow next lock in the thread to proceed.
                 if (!cand.used()) {
+                    GridCacheContext cctx0 = cand.parent().context();
+
                     GridDistributedCacheEntry e =
-                        (GridDistributedCacheEntry)cctx.cache().peekEx(cand.key());
+                        (GridDistributedCacheEntry)cctx0.cache().peekEx(cand.key());
 
                     if (e != null)
                         e.recheck();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 22a5287..49fbc5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -539,7 +539,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         String taskName,
         boolean deserializePortable,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
@@ -552,7 +553,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             deserializePortable,
             forcePrimary,
             null,
-            skipVals);
+            skipVals,
+            canRemap);
     }
 
     /**
@@ -570,7 +572,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
         @Nullable UUID subjId,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
         ) {
         return getAllAsync0(keys,
             readThrough,
@@ -580,7 +583,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             false,
             expiry,
             skipVals,
-            /*keep cache objects*/true);
+            /*keep cache objects*/true,
+            canRemap);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 742fbfe..9005541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -349,12 +349,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
             }
             else {
                 if (tx == null) {
-                    fut = cache().getDhtAllAsync(keys.keySet(),
+                    fut = cache().getDhtAllAsync(
+                        keys.keySet(),
                         readThrough,
                         subjId,
                         taskName,
                         expiryPlc,
-                        skipVals);
+                        skipVals,
+                        /*can remap*/true);
                 }
                 else {
                     fut = tx.getAllAsync(cctx,
@@ -387,12 +389,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                         }
                         else {
                             if (tx == null) {
-                                return cache().getDhtAllAsync(keys.keySet(),
+                                return cache().getDhtAllAsync(
+                                    keys.keySet(),
                                     readThrough,
                                     subjId,
                                     taskName,
                                     expiryPlc,
-                                    skipVals);
+                                    skipVals,
+                                    /*can remap*/true);
                             }
                             else {
                                 return tx.getAllAsync(cctx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
     public GridDhtPartitionMap localPartitionMap();
 
     /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+    /**
      * @return Current update sequence.
      */
     public long updateSequence();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index facf329..8973dcd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -638,6 +638,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @return Future that will be completed when locks are acquired.
      */
     public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Iterable<IgniteTxEntry> reads,
-        @Nullable Iterable<IgniteTxEntry> writes,
+        @Nullable Collection<IgniteTxEntry> reads,
+        @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
         long msgId,
         IgniteUuid nearMiniId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 9bd5de2..2071f8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /** Keys that should be locked. */
     private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
 
+    /** Force keys future for correct transforms. */
+    private IgniteInternalFuture<?> forceKeysFut;
+
     /** Locks ready flag. */
     private volatile boolean locksReady;
 
@@ -291,14 +294,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+                if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
+                    CacheObject val;
+
                     cached.unswap(retVal);
 
                     boolean readThrough = (retVal || hasFilters) &&
                         cacheCtx.config().isLoadPreviousValue() &&
                         !txEntry.skipStore();
 
-                    CacheObject val = cached.innerGet(
+                    val = cached.innerGet(
                         tx,
                         /*swap*/true,
                         readThrough,
@@ -312,10 +317,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         null,
                         null);
 
-                    if (retVal) {
+                    if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
                             invoke = true;
 
+                            if (txEntry.hasValue())
+                                val = txEntry.value();
+
                             KeyCacheObject key = txEntry.key();
 
                             Object procRes = null;
@@ -339,12 +347,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                 }
                             }
 
-                            if (err != null || procRes != null)
-                                ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
-                            else
-                                ret.invokeResult(true);
+                            txEntry.entryProcessorCalculatedValue(val);
+
+                            if (retVal) {
+                                if (err != null || procRes != null)
+                                    ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
+                                else
+                                    ret.invokeResult(true);
+                            }
                         }
-                        else
+                        else if (retVal)
                             ret.value(cacheCtx, val);
                     }
 
@@ -362,7 +374,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         ret.success(false);
                     }
                     else
-                        ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+                        ret.success(txEntry.op() != DELETE || cached.hasValue());
                 }
             }
             catch (IgniteCheckedException e) {
@@ -466,7 +478,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      */
     private boolean mapIfLocked() {
         if (checkLocks()) {
-            prepare0();
+            if (!mapped.compareAndSet(false, true))
+                return false;
+
+            if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+                prepare0();
+            else {
+                forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            f.get();
+
+                            prepare0();
+                        }
+                        catch (IgniteCheckedException e) {
+                            onError(e);
+                        }
+                    }
+                });
+            }
 
             return true;
         }
@@ -574,13 +604,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         // Send reply back to originating near node.
         Throwable prepErr = err.get();
 
+        assert F.isEmpty(tx.invalidPartitions());
+
         GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(
             tx.nearXidVersion(),
             tx.colocated() ? tx.xid() : tx.nearFutureId(),
             nearMiniId == null ? tx.xid() : nearMiniId,
             tx.xidVersion(),
             tx.writeVersion(),
-            tx.invalidPartitions(),
             ret,
             prepErr,
             null);
@@ -708,7 +739,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param writes Write entries.
      * @param txNodes Transaction nodes mapping.
      */
-    public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+    @SuppressWarnings("TypeMayBeWeakened")
+    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes) {
         if (tx.empty()) {
             tx.setRollbackOnly();
@@ -720,6 +752,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         this.writes = writes;
         this.txNodes = txNodes;
 
+        if (!F.isEmpty(writes)) {
+            Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+            for (IgniteTxEntry entry : writes)
+                forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+            forceKeysFut = forceRebalanceKeys(forceKeys);
+        }
+
         readyLocks();
 
         mapIfLocked();
@@ -734,12 +775,79 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+     * required key or will create new map if passed in map is {@code null}.
      *
+     * @param e TX entry.
+     * @param map Map with needed preload keys.
+     * @return Map if it was created.
      */
-    private void prepare0() {
-        if (!mapped.compareAndSet(false, true))
-            return;
+    private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+        IgniteTxEntry e,
+        Map<Integer, Collection<KeyCacheObject>> map
+    ) {
+        if (retVal || !F.isEmpty(e.entryProcessors())) {
+            if (map == null)
+                map = new HashMap<>();
+
+            Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+            if (keys == null) {
+                keys = new ArrayList<>();
+
+                map.put(e.cacheId(), keys);
+            }
+
+            keys.add(e.key());
+        }
+
+        return map;
+    }
+
+    /**
+     * @param keysMap Keys to request.
+     * @return Keys request future.
+     */
+    private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+        if (F.isEmpty(keysMap))
+            return null;
+
+        GridCompoundFuture<Object, Object> compFut = null;
+        IgniteInternalFuture<Object> lastForceFut = null;
+
+        for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+            if (lastForceFut != null && compFut == null) {
+                compFut = new GridCompoundFuture();
+
+                compFut.add(lastForceFut);
+            }
+
+            int cacheId = entry.getKey();
 
+            Collection<KeyCacheObject> keys = entry.getValue();
+
+            lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+            if (compFut != null)
+                compFut.add(lastForceFut);
+        }
+
+        if (compFut != null) {
+            compFut.markInitialized();
+
+            return compFut;
+        }
+        else {
+            assert lastForceFut != null;
+
+            return lastForceFut;
+        }
+    }
+
+    /**
+     *
+     */
+    private void prepare0() {
         try {
             // We are holding transaction-level locks for entries here, so we can get next write version.
             onEntriesLocked();
@@ -956,7 +1064,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     private boolean map(
         IgniteTxEntry entry,
         Map<UUID, GridDistributedTxMapping> futDhtMap,
-        Map<UUID, GridDistributedTxMapping> futNearMap) {
+        Map<UUID, GridDistributedTxMapping> futNearMap
+    ) {
         if (entry.cached().isLocal())
             return false;
 
@@ -1023,14 +1132,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param locMap Exclude map.
      * @return {@code True} if mapped.
      */
-    private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+    private boolean map(
+        IgniteTxEntry entry,
+        Iterable<ClusterNode> nodes,
+        Map<UUID, GridDistributedTxMapping> globalMap,
+        Map<UUID, GridDistributedTxMapping> locMap
+    ) {
         boolean ret = false;
 
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 GridDistributedTxMapping global = globalMap.get(n.id());
 
+                if (!F.isEmpty(entry.entryProcessors())) {
+                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                        entry.cached().partition());
+
+                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                        CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+                        entry.op(procVal == null ? DELETE : UPDATE);
+                        entry.value(procVal, true, false);
+                        entry.entryProcessors(null);
+                    }
+                }
+
                 if (global == null)
                     globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 
@@ -1194,6 +1320,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                 }
 
                 // Process invalid partitions (no need to remap).
+                // Keep this loop for backward compatibility.
                 if (!F.isEmpty(res.invalidPartitions())) {
                     for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
                         IgniteTxEntry entry  = it.next();
@@ -1206,6 +1333,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                     ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
                         }
                     }
+                }
+
+                // Process invalid partitions (no need to remap).
+                if (!F.isEmpty(res.invalidPartitionsByCacheId())) {
+                    Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId();
+
+                    for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) {
+                        IgniteTxEntry entry  = it.next();
+
+                        int[] invalidParts = invalidPartsMap.get(entry.cacheId());
+
+                        if (invalidParts != null && F.contains(invalidParts, entry.cached().partition())) {
+                            it.remove();
+
+                            if (log.isDebugEnabled())
+                                log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() +
+                                    ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']');
+                        }
+                    }
 
                     if (dhtMapping.empty()) {
                         dhtMap.remove(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 753c117..bcf7f8b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -55,6 +55,10 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     @GridDirectCollection(int.class)
     private Collection<Integer> invalidParts;
 
+    /** Invalid partitions by cache ID. */
+    @GridDirectMap(keyType = Integer.class, valueType = int[].class)
+    private Map<Integer, int[]> invalidPartsByCacheId;
+
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
     private List<GridCacheEntryInfo> preloadEntries;
@@ -140,6 +144,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
     }
 
     /**
+     * @return Map from cacheId to an array of invalid partitions.
+     */
+    public Map<Integer, int[]> invalidPartitionsByCacheId() {
+        return invalidPartsByCacheId;
+    }
+
+    /**
+     * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions.
+     */
+    public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) {
+        this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId);
+    }
+
+    /**
      * Gets preload entries found on backup node.
      *
      * @return Collection of entry infos need to be preloaded.
@@ -238,18 +256,24 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG))
                     return false;
 
@@ -288,7 +312,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 10:
-                miniId = reader.readIgniteUuid("miniId");
+                invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -296,7 +320,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 11:
-                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -304,6 +328,14 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
                 reader.incrementState();
 
             case 12:
+                nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -323,6 +355,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index bb3673d..6e39672 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -61,7 +61,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         DFLT_MAX_REMAP_CNT);
 
     /** Context. */
-    private GridCacheContext<K, V> cctx;
+    private final GridCacheContext<K, V> cctx;
 
     /** Keys. */
     private Collection<KeyCacheObject> keys;
@@ -105,6 +105,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     /** Skip values flag. */
     private boolean skipVals;
 
+    /** Flag indicating whether future can be remapped on a newer topology version. */
+    private final boolean canRemap;
+
     /**
      * @param cctx Context.
      * @param keys Keys.
@@ -130,7 +133,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         String taskName,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals
+        boolean skipVals,
+        boolean canRemap
     ) {
         super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
@@ -147,6 +151,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         this.taskName = taskName;
         this.expiryPlc = expiryPlc;
         this.skipVals = skipVals;
+        this.canRemap = canRemap;
 
         futId = IgniteUuid.randomUuid();
 
@@ -160,7 +165,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
      * Initializes future.
      */
     public void init() {
-        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
+        AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+            canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
 
         map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
 
@@ -334,7 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                             remapKeys.add(key);
                     }
 
-                    AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+                    AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
 
                     assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
                         "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -461,7 +467,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
                     }
                 }
 
-                ClusterNode node = cctx.affinity().primary(key, topVer);
+                ClusterNode node = affinityNode(key, topVer);
 
                 if (node == null) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -522,6 +528,28 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
     }
 
     /**
+     * Finds affinity node to send get request to.
+     *
+     * @param key Key to get.
+     * @param topVer Topology version.
+     * @return Affinity node from which the key will be requested.
+     */
+    private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+        if (!canRemap) {
+            List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
+
+            for (ClusterNode node : nodes) {
+                if (cctx.discovery().alive(node))
+                    return node;
+            }
+
+            return null;
+        }
+        else
+            return cctx.affinity().primary(key, topVer);
+    }
+
+    /**
      * @param infos Entry infos.
      * @return Result map.
      */
@@ -557,14 +585,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /** Node ID. */
-        private ClusterNode node;
+        private final ClusterNode node;
 
         /** Keys. */
         @GridToStringInclude
-        private LinkedHashMap<KeyCacheObject, Boolean> keys;
+        private final LinkedHashMap<KeyCacheObject, Boolean> keys;
 
         /** Topology version on which this future was mapped. */
-        private AffinityTopologyVersion topVer;
+        private final AffinityTopologyVersion topVer;
 
         /** {@code True} if remapped after node left. */
         private boolean remapped;
@@ -625,37 +653,45 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
             if (log.isDebugEnabled())
                 log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
 
-            final AffinityTopologyVersion updTopVer =
-                new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+            // Try getting from existing nodes.
+            if (!canRemap) {
+                map(keys.keySet(), F.t(node, keys), topVer);
 
-            final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                cctx.kernalContext().config().getNetworkTimeout(),
-                updTopVer,
-                e);
-
-            cctx.affinity().affinityReadyFuture(updTopVer).listen(
-                new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                    @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                        if (timeout.finish()) {
-                            cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
-                            try {
-                                fut.get();
-
-                                // Remap.
-                                map(keys.keySet(), F.t(node, keys), updTopVer);
-
-                                onDone(Collections.<K, V>emptyMap());
-                            }
-                            catch (IgniteCheckedException e) {
-                                GridPartitionedGetFuture.this.onDone(e);
+                onDone(Collections.<K, V>emptyMap());
+            }
+            else {
+                final AffinityTopologyVersion updTopVer =
+                    new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+
+                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+                    cctx.kernalContext().config().getNetworkTimeout(),
+                    updTopVer,
+                    e);
+
+                cctx.affinity().affinityReadyFuture(updTopVer).listen(
+                    new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            if (timeout.finish()) {
+                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+
+                                try {
+                                    fut.get();
+
+                                    // Remap.
+                                    map(keys.keySet(), F.t(node, keys), updTopVer);
+
+                                    onDone(Collections.<K, V>emptyMap());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    GridPartitionedGetFuture.this.onDone(e);
+                                }
                             }
                         }
                     }
-                }
-            );
+                );
 
-            cctx.kernalContext().timeout().addTimeoutObject(timeout);
+                cctx.kernalContext().timeout().addTimeoutObject(timeout);
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bee34d9..c44b028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -248,7 +248,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
-        final boolean skipVals
+        final boolean skipVals,
+        final boolean canRemap
     ) {
         ctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -278,7 +279,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     deserializePortable,
                     expiryPlc,
                     skipVals,
-                    skipStore);
+                    skipStore,
+                    canRemap);
             }
         });
     }
@@ -870,8 +872,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         boolean deserializePortable,
         @Nullable ExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean skipStore) {
-        AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+        boolean skipStore,
+        boolean canRemap
+    ) {
+        AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+            ctx.shared().exchange().readyAffinityVersion();
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
 
@@ -971,7 +976,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             taskName,
             deserializePortable,
             expiry,
-            skipVals);
+            skipVals,
+            canRemap);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b9cd1844/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index e527f08..07ec808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -231,6 +231,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
         nearEnabled = CU.isNearEnabled(cctx);
 
+        if (!waitTopFut)
+            remapCnt = 1;
+
         this.remapCnt = new AtomicInteger(remapCnt);
     }
 


Mime
View raw message