ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [11/43] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case when class
Date Tue, 12 Apr 2016 00:08:33 GMT
ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished
Implemented 'late affinity assignment', also fixes:
- fixed BinaryObject/BinaryReaderExImpl to properly handle case when class name is written instead of type id
- fixed datastructures code to do not retry updates inside another transaction (otherwise wait for retry hangs)
- fixed races in dynamic cache start (races between cache data received on exchange, exchange,  GridDhtAffinityAssignmentRequest  processing)
- changed GridCacheAdapter.asyncOp to pass ready affinity version in tx op (otherwise async ops can block system threads waiting for affinity)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7e223f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7e223f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7e223f7

Branch: refs/heads/ignite-2949
Commit: e7e223f7c6b73dc0fa28a545671a528b78a00271
Parents: 00a3937
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Apr 5 14:37:23 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Apr 5 14:37:25 2016 +0300

----------------------------------------------------------------------
 .../ignite/cache/affinity/AffinityFunction.java |    2 +
 .../configuration/IgniteConfiguration.java      |   50 +
 .../apache/ignite/internal/IgniteKernal.java    |    2 +
 .../ignite/internal/IgniteNodeAttributes.java   |    3 +
 .../internal/binary/BinaryObjectImpl.java       |   52 +-
 .../binary/BinaryObjectOffheapImpl.java         |   40 +-
 .../internal/binary/BinaryReaderExImpl.java     |   62 +-
 .../ignite/internal/binary/BinaryTypeImpl.java  |    8 +
 .../ignite/internal/binary/BinaryUtils.java     |    4 +-
 .../internal/binary/GridBinaryMarshaller.java   |    4 +-
 .../binary/builder/BinaryBuilderReader.java     |   11 +-
 .../communication/GridIoMessageFactory.java     |    4 +-
 .../discovery/GridDiscoveryManager.java         |   70 +-
 .../affinity/GridAffinityAssignment.java        |   49 +-
 .../affinity/GridAffinityAssignmentCache.java   |  246 +-
 .../affinity/GridAffinityProcessor.java         |    2 +-
 .../processors/affinity/GridAffinityUtils.java  |    3 +-
 .../cache/CacheAffinityChangeMessage.java       |  160 ++
 .../cache/CacheAffinitySharedManager.java       | 1805 ++++++++++++
 .../cache/DynamicCacheChangeRequest.java        |   17 +
 .../cache/DynamicCacheDescriptor.java           |   49 +
 .../processors/cache/GridCacheAdapter.java      |  276 +-
 .../cache/GridCacheAffinityManager.java         |  125 +-
 .../cache/GridCacheClearAllRunnable.java        |    1 -
 .../processors/cache/GridCacheContext.java      |    2 +-
 .../cache/GridCacheEvictionManager.java         |   18 +-
 .../processors/cache/GridCacheIoManager.java    |   29 +-
 .../processors/cache/GridCacheMapEntry.java     |    1 -
 .../cache/GridCacheMvccCandidate.java           |   16 +-
 .../GridCachePartitionExchangeManager.java      |  154 +-
 .../processors/cache/GridCachePreloader.java    |   18 +-
 .../cache/GridCachePreloaderAdapter.java        |   12 +-
 .../processors/cache/GridCacheProcessor.java    |  216 +-
 .../processors/cache/GridCacheProxyImpl.java    |   13 +
 .../cache/GridCacheSharedContext.java           |   18 +-
 .../processors/cache/GridCacheUtils.java        |  122 -
 .../processors/cache/IgniteInternalCache.java   |    5 +
 .../cache/affinity/GridCacheAffinityImpl.java   |    2 +-
 .../CacheDataStructuresManager.java             |    4 +-
 .../distributed/GridCacheCommittedTxInfo.java   |    1 +
 .../GridDistributedCacheAdapter.java            |    2 +-
 .../GridDistributedLockResponse.java            |    8 -
 .../GridDistributedTxRemoteAdapter.java         |    9 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |    3 -
 .../dht/GridClientPartitionTopology.java        |   10 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |  198 +-
 .../dht/GridDhtAssignmentFetchFuture.java       |   80 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   56 +-
 .../cache/distributed/dht/GridDhtGetFuture.java |    2 +
 .../distributed/dht/GridDhtGetSingleFuture.java |    2 +
 .../distributed/dht/GridDhtLockFuture.java      |    7 -
 .../dht/GridDhtPartitionTopology.java           |   10 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |  328 ++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |    1 +
 .../distributed/dht/GridDhtTxFinishFuture.java  |   18 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |    6 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |    4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   10 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |    4 +
 .../dht/GridPartitionedGetFuture.java           |    8 +-
 .../dht/GridPartitionedSingleGetFuture.java     |    8 +-
 .../dht/atomic/GridDhtAtomicCache.java          |    1 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |    2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |    6 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   19 +-
 .../dht/preloader/GridDhtPartitionMap2.java     |    4 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   29 +-
 .../GridDhtPartitionsExchangeFuture.java        | 1547 +++++-----
 .../dht/preloader/GridDhtPreloader.java         |  103 +-
 .../distributed/near/GridNearCacheAdapter.java  |    2 +-
 .../distributed/near/GridNearCacheEntry.java    |  118 +-
 .../distributed/near/GridNearGetFuture.java     |   26 +-
 ...arOptimisticSerializableTxPrepareFuture.java |    5 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   34 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |   16 +-
 .../near/GridNearTransactionalCache.java        |    7 +-
 .../near/GridNearTxFinishFuture.java            |   42 +-
 .../cache/distributed/near/GridNearTxLocal.java |   26 +-
 .../near/GridNearTxPrepareFutureAdapter.java    |    2 +-
 .../distributed/near/GridNearTxRemote.java      |   19 +
 .../processors/cache/dr/GridCacheDrManager.java |    4 +-
 .../cache/dr/GridOsCacheDrManager.java          |    2 +-
 .../cache/local/GridLocalCacheEntry.java        |   27 -
 .../cache/local/GridLocalLockFuture.java        |   15 -
 .../cache/query/GridCacheQueryManager.java      |   15 -
 .../cache/transactions/IgniteTxAdapter.java     |   14 +-
 .../cache/transactions/IgniteTxEntry.java       |    8 +-
 .../cache/transactions/IgniteTxHandler.java     |   16 +-
 .../transactions/IgniteTxLocalAdapter.java      |   97 +-
 .../cache/transactions/IgniteTxLocalEx.java     |   13 +-
 .../cache/transactions/IgniteTxManager.java     |   93 +-
 .../cache/version/GridCacheVersion.java         |   14 +-
 .../cache/version/GridCacheVersionEx.java       |    9 +
 .../cache/version/GridCacheVersionManager.java  |   21 +-
 .../continuous/GridContinuousProcessor.java     |    1 -
 .../datastructures/DataStructuresProcessor.java |    4 +-
 .../processors/odbc/OdbcMessageParser.java      |    2 +-
 .../platform/PlatformContextImpl.java           |    1 +
 .../service/GridServiceProcessor.java           |   64 +-
 .../ignite/internal/util/IgniteUtils.java       |   16 +
 .../internal/util/future/GridFutureAdapter.java |    3 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  127 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |    1 +
 .../GridCacheAffinityBackupsSelfTest.java       |    2 +-
 .../affinity/AffinityClientNodeSelfTest.java    |    4 +-
 .../fair/FairAffinityFunctionNodesSelfTest.java |    2 +
 .../ignite/internal/GridAffinitySelfTest.java   |    5 +-
 .../GridTaskFailoverAffinityRunTest.java        |    6 +-
 .../IgniteClientReconnectAbstractTest.java      |    8 +-
 .../IgniteClientReconnectAtomicsTest.java       |    8 +-
 .../IgniteClientReconnectCollectionsTest.java   |    4 +-
 .../IgniteClientReconnectComputeTest.java       |    6 +-
 .../IgniteClientReconnectFailoverTest.java      |    2 +
 .../IgniteClientReconnectServicesTest.java      |    4 +-
 .../IgniteClientReconnectStreamerTest.java      |    2 +-
 .../internal/TestRecordingCommunicationSpi.java |   18 +-
 .../cache/CacheAffinityCallSelfTest.java        |   45 +-
 .../processors/cache/CacheNamesSelfTest.java    |   16 +-
 ...cheNearUpdateTopologyChangeAbstractTest.java |    2 +
 .../cache/CacheReadThroughRestartSelfTest.java  |    2 +
 .../GridCacheAbstractRemoveFailureTest.java     |   10 +-
 .../cache/GridCacheDeploymentSelfTest.java      |    2 +
 .../cache/GridCacheEntryVersionSelfTest.java    |    2 +-
 ...ridCacheStoreManagerDeserializationTest.java |   40 +-
 ...acheTcpClientDiscoveryMultiThreadedTest.java |    2 +-
 .../GridCacheVersionTopologyChangeTest.java     |  246 ++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |  147 +-
 .../cache/IgniteCacheIncrementTxTest.java       |  299 ++
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |    1 -
 ...CacheP2pUnmarshallingRebalanceErrorTest.java |   36 +-
 .../IgniteCacheP2pUnmarshallingTxErrorTest.java |    2 +
 .../IgniteClientAffinityAssignmentSelfTest.java |    2 +-
 ...niteDynamicCacheStartStopConcurrentTest.java |    6 +-
 .../cache/IgniteTxReentryAbstractSelfTest.java  |    2 +-
 ...eAbstractDataStructuresFailoverSelfTest.java |   12 +-
 .../CacheGetInsideLockChangingTopologyTest.java |    6 +
 ...eLateAffinityAssignmentFairAffinityTest.java |   32 +
 .../CacheLateAffinityAssignmentTest.java        | 2688 ++++++++++++++++++
 .../GridCacheAbstractJobExecutionTest.java      |    6 +-
 .../GridCacheTransformEventSelfTest.java        |    2 +-
 ...niteCacheClientNodeChangingTopologyTest.java |   28 +-
 ...teCacheClientNodePartitionsExchangeTest.java |   85 +-
 .../IgniteCacheClientReconnectTest.java         |    2 +
 .../IgniteCacheNearRestartRollbackSelfTest.java |   28 +-
 ...idCachePartitionedPreloadEventsSelfTest.java |   11 +
 ...ridCachePartitionedUnloadEventsSelfTest.java |    2 +
 .../IgniteCachePutRetryAbstractSelfTest.java    |    4 +-
 ...imaryWriteOrderMultiNodeFullApiSelfTest.java |   35 +
 .../near/GridCacheNearJobExecutionSelfTest.java |    2 -
 .../near/GridCacheNearMultiNodeSelfTest.java    |    4 +-
 .../near/GridCacheNearReadersSelfTest.java      |    2 +
 .../near/GridCacheNearTxForceKeyTest.java       |    6 +-
 ...LateAffDisabledMultiNodeFullApiSelfTest.java |   34 +
 ...achePartitionedMultiNodeCounterSelfTest.java |   43 +-
 ...achePartitionedMultiNodeFullApiSelfTest.java |    2 +
 ...idCacheRendezvousAffinityClientSelfTest.java |    2 +
 .../GridCacheRebalancingSyncSelfTest.java       |   83 +-
 .../GridCacheReplicatedJobExecutionTest.java    |    2 -
 ...ContinuousQueryFailoverAbstractSelfTest.java |   36 +-
 ...BehindStorePartitionedMultiNodeSelfTest.java |   11 +-
 .../loadtests/hashmap/GridCacheTestContext.java |    2 +
 .../testframework/junits/GridAbstractTest.java  |    3 +-
 .../junits/common/GridCommonAbstractTest.java   |  116 +-
 .../IgniteCacheFullApiSelfTestSuite.java        |    6 +
 .../testsuites/IgniteCacheTestSuite2.java       |    4 +
 .../testsuites/IgniteCacheTestSuite3.java       |    2 +
 .../testsuites/IgniteCacheTestSuite5.java       |    5 +
 .../processors/hadoop/HadoopContext.java        |    3 +-
 .../child/HadoopChildProcessRunner.java         |    2 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java |    2 +
 .../IgniteHadoopFileSystemAbstractSelfTest.java |    2 +
 .../h2/twostep/GridReduceQueryExecutor.java     |    4 +-
 .../cache/IgniteClientReconnectQueriesTest.java |    4 +-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |    3 +
 .../Continuous/ContinuousQueryAbstractTest.cs   |   25 +-
 .../Apache.Ignite.Core.Tests/ExceptionsTest.cs  |    5 +
 176 files changed, 8907 insertions(+), 2407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java
index cd12ab3..3af2a4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityFunction.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
  * Cache key affinity which maps keys to nodes. This interface is utilized for
@@ -47,6 +48,7 @@ import org.apache.ignite.cluster.ClusterNode;
  * {@link AffinityKeyMapped @AffinityKeyMapped} documentation.
  * @see AffinityKeyMapped
  * @see AffinityKeyMapper
+ * @see IgniteConfiguration#isLateAffinityAssignment()
  */
 public interface AffinityFunction extends Serializable {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index e06978f..1aa3920 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -32,6 +32,8 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheKeyConfiguration;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
@@ -202,6 +204,9 @@ public class IgniteConfiguration {
     /** Default value for cache sanity check enabled flag. */
     public static final boolean DFLT_CACHE_SANITY_CHECK_ENABLED = true;
 
+    /** Default value for late affinity assignment flag. */
+    public static final boolean DFLT_LATE_AFF_ASSIGNMENT = true;
+
     /** Default failure detection timeout in millis. */
     @SuppressWarnings("UnnecessaryBoxing")
     public static final Long DFLT_FAILURE_DETECTION_TIMEOUT = new Long(10_000);
@@ -441,6 +446,9 @@ public class IgniteConfiguration {
     /** */
     private BinaryConfiguration binaryCfg;
 
+    /** */
+    private boolean lateAffAssignment = DFLT_LATE_AFF_ASSIGNMENT;
+
     /**
      * Creates valid grid configuration with all default values.
      */
@@ -497,6 +505,7 @@ public class IgniteConfiguration {
         hadoopCfg = cfg.getHadoopConfiguration();
         inclEvtTypes = cfg.getIncludeEventTypes();
         includeProps = cfg.getIncludeProperties();
+        lateAffAssignment = cfg.isLateAffinityAssignment();
         lifecycleBeans = cfg.getLifecycleBeans();
         locHost = cfg.getLocalHost();
         log = cfg.getGridLogger();
@@ -2519,6 +2528,47 @@ public class IgniteConfiguration {
         this.platformCfg = platformCfg;
     }
 
+    /**
+     * Whether or not late affinity assignment mode should be used.
+     * <p>
+     * On each topology change, for each started cache partition-to-node mapping is
+     * calculated using {@link AffinityFunction} configured for cache. When late
+     * affinity assignment mode is disabled then new affinity mapping is applied immediately.
+     * <p>
+     * With late affinity assignment mode if primary node was changed for some partition, but data for this
+     * partition is not rebalanced yet on this node, then current primary is not changed and new primary is temporary
+     * assigned as backup. This nodes becomes primary only when rebalancing for all assigned primary partitions is
+     * finished. This mode can show better performance for cache operations, since when cache primary node
+     * executes some operation and data is not rebalanced yet, then it sends additional message to force rebalancing
+     * from other nodes.
+     * <p>
+     * Note, that {@link Affinity} interface provides assignment information taking into account late assignment,
+     * so while rebalancing for new primary nodes is not finished it can return assignment which differs
+     * from assignment calculated by {@link AffinityFunction#assignPartitions}.
+     * <p>
+     * This property should have the same value for all nodes in cluster.
+     * <p>
+     * If not provided, default value is {@link #DFLT_LATE_AFF_ASSIGNMENT}.
+     *
+     * @return Late affinity assignment flag.
+     * @see AffinityFunction
+     */
+    public boolean isLateAffinityAssignment() {
+        return lateAffAssignment;
+    }
+
+    /**
+     * Sets late affinity assignment flag.
+     *
+     * @param lateAffAssignment Late affinity assignment flag.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setLateAffinityAssignment(boolean lateAffAssignment) {
+        this.lateAffAssignment = lateAffAssignment;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index cec4b74..20795fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -191,6 +191,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -1265,6 +1266,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         add(ATTR_MARSHALLER, cfg.getMarshaller().getClass().getName());
         add(ATTR_MARSHALLER_USE_DFLT_SUID,
             getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID));
+        add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment());
 
         if (cfg.getMarshaller() instanceof BinaryMarshaller) {
             add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ?

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index da6f40d..3493eae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -150,6 +150,9 @@ public final class IgniteNodeAttributes {
     /** Binary configuration. */
     public static final String ATTR_BINARY_CONFIGURATION = ATTR_PREFIX + ".binary.config";
 
+    /** Late affinity assignment mode. */
+    public static final String ATTR_LATE_AFFINITY_ASSIGNMENT = ATTR_PREFIX + ".cache.lateAffinity";
+
     /**
      * Enforces singleton.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 173bb6e..0997d6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -222,7 +222,23 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
     /** {@inheritDoc} */
     @Override public int typeId() {
-        return BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.TYPE_ID_POS);
+        int off = start + GridBinaryMarshaller.TYPE_ID_POS;
+
+        int typeId = BinaryPrimitives.readInt(arr, off);
+
+        if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
+            off = start + GridBinaryMarshaller.DFLT_HDR_LEN;
+
+            assert arr[off] == GridBinaryMarshaller.STRING : arr[off];
+
+            int len = BinaryPrimitives.readInt(arr, ++off);
+
+            String clsName = new String(arr, off + 4, len, UTF_8);
+
+            typeId = ctx.typeId(clsName);
+        }
+
+        return typeId;
     }
 
     /** {@inheritDoc} */
@@ -236,13 +252,13 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException {
-        return (F) reader(null).unmarshalField(fieldName);
+        return (F) reader(null, false).unmarshalField(fieldName);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException {
-        return (F) reader(null).unmarshalField(fieldId);
+        return (F) reader(null, false).unmarshalField(fieldId);
     }
 
     /** {@inheritDoc} */
@@ -251,20 +267,20 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
         Object val;
 
         // Calculate field position.
-        int schemaOffset = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+        int schemaOff = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
 
         short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
 
         int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
-        int fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags);
+        int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
 
-        int fieldOffsetPos = start + schemaOffset + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen;
+        int fieldOffsetPos = start + schemaOff + order * (fieldIdLen + fieldOffLen) + fieldIdLen;
 
         int fieldPos;
 
-        if (fieldOffsetLen == BinaryUtils.OFFSET_1)
+        if (fieldOffLen == BinaryUtils.OFFSET_1)
             fieldPos = start + ((int)BinaryPrimitives.readByte(arr, fieldOffsetPos) & 0xFF);
-        else if (fieldOffsetLen == BinaryUtils.OFFSET_2)
+        else if (fieldOffLen == BinaryUtils.OFFSET_2)
             fieldPos = start + ((int)BinaryPrimitives.readShort(arr, fieldOffsetPos) & 0xFFFF);
         else
             fieldPos = start + BinaryPrimitives.readInt(arr, fieldOffsetPos);
@@ -387,12 +403,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override protected <F> F field(BinaryReaderHandles rCtx, String fieldName) {
-        return (F)reader(rCtx).unmarshalField(fieldName);
+        return (F)reader(rCtx, false).unmarshalField(fieldName);
     }
 
     /** {@inheritDoc} */
     @Override public boolean hasField(String fieldName) {
-        return reader(null).findFieldByName(fieldName);
+        return reader(null, false).findFieldByName(fieldName);
     }
 
     /** {@inheritDoc} */
@@ -423,7 +439,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
 
     /** {@inheritDoc} */
     @Override protected BinarySchema createSchema() {
-        return reader(null).getOrCreateSchema();
+        return reader(null, false).getOrCreateSchema();
     }
 
     /** {@inheritDoc} */
@@ -537,7 +553,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
      */
     private Object deserializeValue(@Nullable CacheObjectContext coCtx) {
         BinaryReaderExImpl reader = reader(null,
-            coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader());
+            coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true);
 
         Object obj0 = reader.deserialize();
 
@@ -563,25 +579,29 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
      * Create new reader for this object.
      *
      * @param rCtx Reader context.
+     * @param ldr Class loader.
+     * @param forUnmarshal {@code True} if reader is need to unmarshal object.
      * @return Reader.
      */
-    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr) {
+    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr, boolean forUnmarshal) {
         if (ldr == null)
             ldr = ctx.configuration().getClassLoader();
 
         return new BinaryReaderExImpl(ctx,
             BinaryHeapInputStream.create(arr, start),
             ldr,
-            rCtx);
+            rCtx,
+            forUnmarshal);
     }
 
     /**
      * Create new reader for this object.
      *
      * @param rCtx Reader context.
+     * @param forUnmarshal {@code True} if reader is need to unmarshal object.
      * @return Reader.
      */
-    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx) {
-        return reader(rCtx, null);
+    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, boolean forUnmarshal) {
+        return reader(rCtx, null, forUnmarshal);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index 27d3012..c687192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -91,7 +91,17 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
 
     /** {@inheritDoc} */
     @Override public int typeId() {
-        return BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS);
+        int typeId = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS);
+
+        if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
+            int off = start + GridBinaryMarshaller.DFLT_HDR_LEN;
+
+            String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(off, size));
+
+            typeId = ctx.typeId(clsName);
+        }
+
+        return typeId;
     }
 
     /** {@inheritDoc} */
@@ -111,7 +121,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
 
     /** {@inheritDoc} */
     @Override protected BinarySchema createSchema() {
-        return reader(null).getOrCreateSchema();
+        return reader(null, false).getOrCreateSchema();
     }
 
     /** {@inheritDoc} */
@@ -145,13 +155,13 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public <F> F field(String fieldName) throws BinaryObjectException {
-        return (F) reader(null).unmarshalField(fieldName);
+        return (F) reader(null, false).unmarshalField(fieldName);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public <F> F field(int fieldId) throws BinaryObjectException {
-        return (F) reader(null).unmarshalField(fieldId);
+        return (F) reader(null, false).unmarshalField(fieldId);
     }
 
     /** {@inheritDoc} */
@@ -160,20 +170,20 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
         Object val;
 
         // Calculate field position.
-        int schemaOffset = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+        int schemaOff = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
 
         short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
 
         int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
-        int fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags);
+        int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
 
-        int fieldOffsetPos = start + schemaOffset + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen;
+        int fieldOffsetPos = start + schemaOff + order * (fieldIdLen + fieldOffLen) + fieldIdLen;
 
         int fieldPos;
 
-        if (fieldOffsetLen == BinaryUtils.OFFSET_1)
+        if (fieldOffLen == BinaryUtils.OFFSET_1)
             fieldPos = start + ((int)BinaryPrimitives.readByte(ptr, fieldOffsetPos) & 0xFF);
-        else if (fieldOffsetLen == BinaryUtils.OFFSET_2)
+        else if (fieldOffLen == BinaryUtils.OFFSET_2)
             fieldPos = start + ((int)BinaryPrimitives.readShort(ptr, fieldOffsetPos) & 0xFFFF);
         else
             fieldPos = start + BinaryPrimitives.readInt(ptr, fieldOffsetPos);
@@ -301,12 +311,12 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override protected <F> F field(BinaryReaderHandles rCtx, String fieldName) {
-        return (F)reader(rCtx).unmarshalField(fieldName);
+        return (F)reader(rCtx, false).unmarshalField(fieldName);
     }
 
     /** {@inheritDoc} */
     @Override public boolean hasField(String fieldName) {
-        return reader(null).findFieldByName(fieldName);
+        return reader(null, false).findFieldByName(fieldName);
     }
 
     /** {@inheritDoc} */
@@ -401,16 +411,17 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
      * @return Deserialized value.
      */
     private Object deserializeValue() {
-        return reader(null).deserialize();
+        return reader(null, true).deserialize();
     }
 
     /**
      * Create new reader for this object.
      *
      * @param rCtx Reader context.
+     * @param forUnmarshal {@code True} if reader is needed to unmarshal object.
      * @return Reader.
      */
-    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx) {
+    private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, boolean forUnmarshal) {
         BinaryOffheapInputStream stream = new BinaryOffheapInputStream(ptr, size, false);
 
         stream.position(start);
@@ -418,6 +429,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
         return new BinaryReaderExImpl(ctx,
             stream,
             ctx.configuration().getClassLoader(),
-            rCtx);
+            rCtx,
+            forUnmarshal);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index f9e7aa5..69aecbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -127,7 +127,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
     private final int fieldIdLen;
 
     /** Offset size in bytes. */
-    private final int fieldOffsetLen;
+    private final int fieldOffLen;
 
     /** Object schema. */
     private final BinarySchema schema;
@@ -147,9 +147,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
      * @param ctx Context.
      * @param in Input stream.
      * @param ldr Class loader.
+     * @param forUnmarshal {@code True} if reader is needed to unmarshal object.
      */
-    public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr) {
-        this(ctx, in, ldr, null);
+    public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr, boolean forUnmarshal) {
+        this(ctx,
+            in,
+            ldr,
+            null,
+            forUnmarshal);
     }
 
     /**
@@ -159,10 +164,19 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
      * @param in Input stream.
      * @param ldr Class loader.
      * @param hnds Context.
+     * @param forUnmarshal {@code True} if reader is need to unmarshal object.
      */
-    public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr,
-        @Nullable BinaryReaderHandles hnds) {
-        this(ctx, in, ldr, hnds, false);
+    public BinaryReaderExImpl(BinaryContext ctx,
+        BinaryInputStream in,
+        ClassLoader ldr,
+        @Nullable BinaryReaderHandles hnds,
+        boolean forUnmarshal) {
+        this(ctx,
+            in,
+            ldr,
+            hnds,
+            false,
+            forUnmarshal);
     }
 
     /**
@@ -173,9 +187,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
      * @param ldr Class loader.
      * @param hnds Context.
      * @param skipHdrCheck Whether to skip header check.
-     */
-    public BinaryReaderExImpl(BinaryContext ctx, BinaryInputStream in, ClassLoader ldr,
-        @Nullable BinaryReaderHandles hnds, boolean skipHdrCheck) {
+     * @param forUnmarshal {@code True} if reader is need to unmarshal object.
+     */
+    public BinaryReaderExImpl(BinaryContext ctx,
+        BinaryInputStream in,
+        ClassLoader ldr,
+        @Nullable BinaryReaderHandles hnds,
+        boolean skipHdrCheck,
+        boolean forUnmarshal) {
         // Initialize base members.
         this.ctx = ctx;
         this.in = in;
@@ -202,7 +221,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
             // Get trivial flag values.
             userType = BinaryUtils.isUserType(flags);
             fieldIdLen = BinaryUtils.fieldIdLength(flags);
-            fieldOffsetLen = BinaryUtils.fieldOffsetLength(flags);
+            fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
 
             // Calculate footer borders and raw offset.
             if (BinaryUtils.hasSchema(flags)) {
@@ -233,8 +252,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
             if (typeId0 == UNREGISTERED_TYPE_ID) {
                 int off = in.position();
 
-                // Registers class by type ID, at least locally if the cache is not ready yet.
-                typeId = ctx.descriptorForClass(BinaryUtils.doReadClass(in, ctx, ldr, typeId0), false).typeId();
+                if (forUnmarshal) {
+                    // Registers class by type ID, at least locally if the cache is not ready yet.
+                    typeId = ctx.descriptorForClass(BinaryUtils.doReadClass(in, ctx, ldr, typeId0), false).typeId();
+                }
+                else
+                    typeId = ctx.typeId(BinaryUtils.doReadClassName(in));
 
                 int clsNameLen = in.position() - off;
 
@@ -259,7 +282,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
             schemaId = 0;
             userType = false;
             fieldIdLen = 0;
-            fieldOffsetLen = 0;
+            fieldOffLen = 0;
             schema = null;
         }
 
@@ -1640,6 +1663,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
     }
 
     /**
+     * @param fieldId Field ID.
      * @return Deserialized object.
      * @throws BinaryObjectException If failed.
      */
@@ -1647,7 +1671,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
         if (!findFieldById(fieldId))
             return null;
 
-        return new BinaryReaderExImpl(ctx, in, ldr, hnds).deserialize();
+        return new BinaryReaderExImpl(ctx, in, ldr, hnds, true).deserialize();
     }
 
     /**
@@ -1717,7 +1741,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
 
             builder.addField(fieldId);
 
-            searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffsetLen;
+            searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffLen;
         }
 
         return builder.build();
@@ -1851,9 +1875,9 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
      */
     private boolean trySetUserFieldPosition(int order) {
         if (order != BinarySchema.ORDER_NOT_FOUND) {
-            int offsetPos = footerStart + order * (fieldIdLen + fieldOffsetLen) + fieldIdLen;
+            int offsetPos = footerStart + order * (fieldIdLen + fieldOffLen) + fieldIdLen;
 
-            int pos = start + BinaryUtils.fieldOffsetRelative(in, offsetPos, fieldOffsetLen);
+            int pos = start + BinaryUtils.fieldOffsetRelative(in, offsetPos, fieldOffLen);
 
             streamPosition(pos);
 
@@ -1884,14 +1908,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
 
             if (id0 == id) {
                 int pos = start + BinaryUtils.fieldOffsetRelative(in, searchPos + BinaryUtils.FIELD_ID_LEN,
-                    fieldOffsetLen);
+                    fieldOffLen);
 
                 streamPosition(pos);
 
                 return true;
             }
 
-            searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffsetLen;
+            searchPos += BinaryUtils.FIELD_ID_LEN + fieldOffLen;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java
index d4fd625..132702c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryTypeImpl.java
@@ -20,12 +20,15 @@ package org.apache.ignite.internal.binary;
 import org.apache.ignite.binary.BinaryType;
 
 import java.util.Collection;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Binary type implementation.
  */
 public class BinaryTypeImpl implements BinaryType {
     /** Binary context. */
+    @GridToStringExclude
     private final BinaryContext ctx;
 
     /** Type metadata. */
@@ -90,4 +93,9 @@ public class BinaryTypeImpl implements BinaryType {
     public BinaryMetadata metadata() {
         return meta;
     }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(BinaryTypeImpl.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 37f1d6a..c0202dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -1390,7 +1390,7 @@ public class BinaryUtils {
      * @param in Input stream.
      * @return Class name.
      */
-    private static String doReadClassName(BinaryInputStream in) {
+    public static String doReadClassName(BinaryInputStream in) {
         byte flag = in.readByte();
 
         if (flag != GridBinaryMarshaller.STRING)
@@ -1565,7 +1565,7 @@ public class BinaryUtils {
      */
     @Nullable public static Object doReadObject(BinaryInputStream in, BinaryContext ctx, ClassLoader ldr,
         BinaryReaderHandlesHolder handles) throws BinaryObjectException {
-        return new BinaryReaderExImpl(ctx, in, ldr, handles.handles()).deserialize();
+        return new BinaryReaderExImpl(ctx, in, ldr, handles.handles(), true).deserialize();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
index 67e741b..00d8871 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java
@@ -292,7 +292,7 @@ public class GridBinaryMarshaller {
         BinaryContext oldCtx = pushContext(ctx);
 
         try {
-            return (T)new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, 0), ldr).deserialize();
+            return (T)new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, 0), ldr, true).deserialize();
         }
         finally {
             popContext(oldCtx);
@@ -340,7 +340,7 @@ public class GridBinaryMarshaller {
     public BinaryReaderExImpl reader(BinaryInputStream stream) {
         assert stream != null;
 
-        return new BinaryReaderExImpl(ctx, stream, null);
+        return new BinaryReaderExImpl(ctx, stream, null, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
index 662ad1d..347fb2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryBuilderReader.java
@@ -55,7 +55,7 @@ public class BinaryBuilderReader implements BinaryPositionReadable {
     /** */
     private int pos;
 
-    /*
+    /**
      * Constructor.
      *
      * @param objImpl Binary object
@@ -67,7 +67,8 @@ public class BinaryBuilderReader implements BinaryPositionReadable {
 
         reader = new BinaryReaderExImpl(ctx,
             BinaryHeapInputStream.create(arr, pos),
-            ctx.configuration().getClassLoader());
+            ctx.configuration().getClassLoader(),
+            false);
 
         objMap = new HashMap<>();
     }
@@ -83,7 +84,11 @@ public class BinaryBuilderReader implements BinaryPositionReadable {
         this.arr = other.arr;
         this.pos = start;
 
-        reader = new BinaryReaderExImpl(ctx, BinaryHeapInputStream.create(arr, start), null, other.reader.handles());
+        reader = new BinaryReaderExImpl(ctx,
+            BinaryHeapInputStream.create(arr, start),
+            null,
+            other.reader.handles(),
+            false);
 
         this.objMap = other.objMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3c7f378..47b1c5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,13 +26,13 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
 import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
-import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
-import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 5b56d38..7f02498 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
 import org.apache.ignite.internal.processors.security.SecurityContext;
@@ -124,6 +125,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
 import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
@@ -508,9 +510,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                         verChanged = true;
                     }
+
+                    nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
                 }
+                else {
+                    nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
-                nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
+                    ctx.cache().onDiscoveryEvent(type, node, nextTopVer);
+                }
 
                 if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
                     for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) {
@@ -996,6 +1003,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         Boolean locMarshUseDfltSuid = locNode.attribute(ATTR_MARSHALLER_USE_DFLT_SUID);
         boolean locMarshUseDfltSuidBool = locMarshUseDfltSuid == null ? true : locMarshUseDfltSuid;
 
+        boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
+
         for (ClusterNode n : nodes) {
             int rmtJvmMajVer = nodeJavaMajorVersion(n);
 
@@ -1054,6 +1063,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     ", rmtNodeAddrs=" + U.addressesAsString(n) +
                     ", locNodeId=" + locNode.id() + ", rmtNodeId=" + n.id() + ']');
             }
+
+            boolean rmtLateAssign;
+
+            if (n.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0)
+                rmtLateAssign = n.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
+            else
+                rmtLateAssign = false;
+
+            if (locDelayAssign != rmtLateAssign) {
+                throw new IgniteCheckedException("Remote node has cache affinity assignment mode different from local " +
+                    "[locId8=" +  U.id8(locNode.id()) +
+                    ", locDelayAssign=" + locDelayAssign +
+                    ", rmtId8=" + U.id8(n.id()) +
+                    ", rmtLateAssign=" + rmtLateAssign +
+                    ", rmtAddrs=" + U.addressesAsString(n) + ']');
+            }
         }
 
         if (log.isDebugEnabled())
@@ -1456,7 +1481,36 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @return Collection of cache nodes.
      */
     public Collection<ClusterNode> nodes(long topVer) {
-        return resolveDiscoCache(null, new AffinityTopologyVersion(topVer)).allNodes();
+        return nodes(new AffinityTopologyVersion(topVer));
+    }
+
+    /**
+     * Gets all nodes for given topology version.
+     *
+     * @param topVer Topology version.
+     * @return Collection of cache nodes.
+     */
+    public Collection<ClusterNode> nodes(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).allNodes();
+    }
+
+    /**
+     * @param topVer Topology version.
+     * @return All server nodes for given topology version.
+     */
+    public List<ClusterNode> serverNodes(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(null, topVer).srvNodes;
+    }
+
+    /**
+     * Gets node from history for given topology version.
+     *
+     * @param topVer Topology version.
+     * @param id Node ID.
+     * @return Node.
+     */
+    public ClusterNode node(AffinityTopologyVersion topVer, UUID id) {
+        return resolveDiscoCache(null, topVer).node(id);
     }
 
     /**
@@ -2394,6 +2448,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         /** All nodes. */
         private final List<ClusterNode> allNodes;
 
+        /** All server nodes. */
+        private final List<ClusterNode> srvNodes;
+
         /** All nodes with at least one cache configured. */
         @GridToStringInclude
         private final Collection<ClusterNode> allNodesWithCaches;
@@ -2500,8 +2557,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
             Set<String> nearEnabledSet = new HashSet<>();
 
+            List<ClusterNode> srvNodes = new ArrayList<>();
+
             for (ClusterNode node : allNodes) {
                 assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']';
+                assert !node.isDaemon();
+
+                if (!CU.clientNode(node))
+                    srvNodes.add(node);
 
                 if (node.order() > maxOrder0)
                     maxOrder0 = node.order();
@@ -2568,6 +2631,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 nodes.add(node);
             }
 
+            Collections.sort(srvNodes, CU.nodeComparator(true));
+
             // Need second iteration to add this node to all previous node versions.
             for (ClusterNode node : allNodes) {
                 IgniteProductVersion nodeVer = U.productVersion(node);
@@ -2588,6 +2653,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches);
             this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches);
             nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet);
+            this.srvNodes = Collections.unmodifiableList(srvNodes);
 
             daemonNodes = Collections.unmodifiableList(new ArrayList<>(
                 F.view(F.concat(false, loc, rmts), F0.not(FILTER_DAEMON))));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 7b2bea3..92908cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -51,6 +51,12 @@ public class GridAffinityAssignment implements Serializable {
     /** Assignment node IDs */
     private transient volatile List<HashSet<UUID>> assignmentIds;
 
+    /** Nodes having primary partitions assignments. */
+    private transient volatile Set<ClusterNode> primaryPartsNodes;
+
+    /** */
+    private transient List<List<ClusterNode>> idealAssignment;
+
     /**
      * Constructs cached affinity calculations item.
      *
@@ -65,10 +71,18 @@ public class GridAffinityAssignment implements Serializable {
     /**
      * @param topVer Topology version.
      * @param assignment Assignment.
+     * @param idealAssignment Ideal assignment.
      */
-    GridAffinityAssignment(AffinityTopologyVersion topVer, List<List<ClusterNode>> assignment) {
+    GridAffinityAssignment(AffinityTopologyVersion topVer,
+        List<List<ClusterNode>> assignment,
+        List<List<ClusterNode>> idealAssignment) {
+        assert topVer != null;
+        assert assignment != null;
+        assert idealAssignment != null;
+
         this.topVer = topVer;
         this.assignment = assignment;
+        this.idealAssignment = idealAssignment;
 
         primary = new HashMap<>();
         backup = new HashMap<>();
@@ -84,11 +98,19 @@ public class GridAffinityAssignment implements Serializable {
         this.topVer = topVer;
 
         assignment = aff.assignment;
+        idealAssignment = aff.idealAssignment;
         primary = aff.primary;
         backup = aff.backup;
     }
 
     /**
+     * @return Affinity assignment computed by affinity function.
+     */
+    public List<List<ClusterNode>> idealAssignment() {
+        return idealAssignment;
+    }
+
+    /**
      * @return Affinity assignment.
      */
     public List<List<ClusterNode>> assignment() {
@@ -146,6 +168,31 @@ public class GridAffinityAssignment implements Serializable {
     }
 
     /**
+     * @return Nodes having primary partitions assignments.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    public Set<ClusterNode> primaryPartitionNodes() {
+        Set<ClusterNode> primaryPartsNodes0 = primaryPartsNodes;
+
+        if (primaryPartsNodes0 == null) {
+            int parts = assignment.size();
+
+            primaryPartsNodes0 = new HashSet<>();
+
+            for (int p = 0; p < parts; p++) {
+                List<ClusterNode> nodes = assignment.get(p);
+
+                if (nodes.size() > 0)
+                    primaryPartsNodes0.add(nodes.get(0));
+            }
+
+            primaryPartsNodes = primaryPartsNodes0;
+        }
+
+        return primaryPartsNodes0;
+    }
+
+    /**
      * Get primary partitions for specified node ID.
      *
      * @param nodeId Node ID to get primary partitions for.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 26e4d98..0cacf68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -25,27 +25,28 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
 import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
-import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 
@@ -56,32 +57,41 @@ public class GridAffinityAssignmentCache {
     /** Cache name. */
     private final String cacheName;
 
+    /** */
+    private final Integer cacheId;
+
     /** Number of backups. */
-    private int backups;
+    private final int backups;
 
     /** Affinity function. */
     private final AffinityFunction aff;
 
+    /** */
+    private final IgnitePredicate<ClusterNode> nodeFilter;
+
     /** Partitions count. */
     private final int partsCnt;
 
-    /** Affinity mapper function. */
-    private final AffinityKeyMapper affMapper;
-
     /** Affinity calculation results cache: topology version => partition => nodes. */
-    private final ConcurrentLinkedHashMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+    private final ConcurrentNavigableMap<AffinityTopologyVersion, GridAffinityAssignment> affCache;
+
+    /** */
+    private List<List<ClusterNode>> idealAssignment;
 
     /** Cache item corresponding to the head topology version. */
     private final AtomicReference<GridAffinityAssignment> head;
 
-    /** Discovery manager. */
-    private final GridCacheContext ctx;
-
     /** Ready futures. */
     private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
 
     /** Log. */
-    private IgniteLogger log;
+    private final IgniteLogger log;
+
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final boolean locCache;
 
     /** Node stop flag. */
     private volatile IgniteCheckedException stopErr;
@@ -92,42 +102,62 @@ public class GridAffinityAssignmentCache {
      * @param ctx Kernal context.
      * @param cacheName Cache name.
      * @param aff Affinity function.
-     * @param affMapper Affinity key mapper.
+     * @param nodeFilter Node filter.
      * @param backups Number of backups.
+     * @param locCache Local cache flag.
      */
     @SuppressWarnings("unchecked")
-    public GridAffinityAssignmentCache(GridCacheContext ctx,
+    public GridAffinityAssignmentCache(GridKernalContext ctx,
         String cacheName,
         AffinityFunction aff,
-        AffinityKeyMapper affMapper,
-        int backups)
+        IgnitePredicate<ClusterNode> nodeFilter,
+        int backups,
+        boolean locCache)
     {
         assert ctx != null;
         assert aff != null;
-        assert affMapper != null;
 
         this.ctx = ctx;
         this.aff = aff;
-        this.affMapper = affMapper;
+        this.nodeFilter = nodeFilter;
         this.cacheName = cacheName;
         this.backups = backups;
+        this.locCache = locCache;
+
+        cacheId = CU.cacheId(cacheName);
 
-        log = ctx.logger(GridAffinityAssignmentCache.class);
+        log = ctx.log(GridAffinityAssignmentCache.class);
 
         partsCnt = aff.partitions();
-        affCache = new ConcurrentLinkedHashMap<>();
+        affCache = new ConcurrentSkipListMap<>();
         head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
     }
 
     /**
-     * Initializes affinity with given topology version and assignment. The assignment is calculated on remote nodes
-     * and brought to local node on partition map exchange.
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Cache ID.
+     */
+    public Integer cacheId() {
+        return cacheId;
+    }
+
+    /**
+     * Initializes affinity with given topology version and assignment.
      *
      * @param topVer Topology version.
      * @param affAssignment Affinity assignment for topology version.
      */
     public void initialize(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
-        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment);
+        assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
+        assert idealAssignment != null;
+
+        GridAffinityAssignment assignment = new GridAffinityAssignment(topVer, affAssignment, idealAssignment);
 
         affCache.put(topVer, assignment);
         head.set(assignment);
@@ -144,11 +174,32 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param assignment Assignment.
+     */
+    public void idealAssignment(List<List<ClusterNode>> assignment) {
+        this.idealAssignment = assignment;
+    }
+
+    /**
+     * @return Assignment.
+     */
+    @Nullable public List<List<ClusterNode>> idealAssignment() {
+        return idealAssignment;
+    }
+
+    /**
+     * @return {@code True} if affinity function has {@link AffinityCentralizedFunction} annotation.
+     */
+    public boolean centralizedAffinityFunction() {
+        return U.hasAnnotation(aff, AffinityCentralizedFunction.class);
+    }
+
+    /**
      * Kernal stop callback.
      *
      * @param err Error.
      */
-    public void onKernalStop(IgniteCheckedException err) {
+    public void cancelFutures(IgniteCheckedException err) {
         stopErr = err;
 
         for (AffinityReadyFuture fut : readyFuts.values())
@@ -159,6 +210,8 @@ public class GridAffinityAssignmentCache {
      *
      */
     public void onReconnected() {
+        idealAssignment = null;
+
         affCache.clear();
 
         head.set(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
@@ -179,33 +232,23 @@ public class GridAffinityAssignmentCache {
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
                 ", discoEvt=" + discoEvt + ']');
 
-        Iterator<AffinityTopologyVersion> it = affCache.descendingKeySet().iterator();
-
-        AffinityTopologyVersion prevVer = null;
-
-        if (it.hasNext())
-            prevVer = it.next();
-
-        GridAffinityAssignment prev = prevVer == null ? null : affCache.get(prevVer);
+        List<List<ClusterNode>> prevAssignment = idealAssignment;
 
+        // Resolve nodes snapshot for specified topology version.
         List<ClusterNode> sorted;
 
-        if (ctx.isLocal())
-            // For local cache always use local node.
-            sorted = Collections.singletonList(ctx.localNode());
-        else {
-            // Resolve nodes snapshot for specified topology version.
+        if (!locCache) {
             sorted = new ArrayList<>(ctx.discovery().cacheAffinityNodes(cacheName, topVer));
 
             Collections.sort(sorted, GridNodeOrderComparator.INSTANCE);
         }
-
-        List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment();
+        else
+            sorted = Collections.singletonList(ctx.discovery().localNode());
 
         List<List<ClusterNode>> assignment;
 
         if (prevAssignment != null && discoEvt != null) {
-            boolean affNode = ctx.discovery().cacheAffinityNode(discoEvt.eventNode(), ctx.name());
+            boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
 
             if (!affNode)
                 assignment = prevAssignment;
@@ -219,32 +262,12 @@ public class GridAffinityAssignmentCache {
 
         assert assignment != null;
 
-        GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
-
-        updated = F.addIfAbsent(affCache, topVer, updated);
-
-        // Update top version, if required.
-        while (true) {
-            GridAffinityAssignment headItem = head.get();
-
-            if (headItem.topologyVersion().compareTo(topVer) >= 0)
-                break;
-
-            if (head.compareAndSet(headItem, updated))
-                break;
-        }
-
-        for (Map.Entry<AffinityTopologyVersion, AffinityReadyFuture> entry : readyFuts.entrySet()) {
-            if (entry.getKey().compareTo(topVer) <= 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Completing topology ready future (calculated affinity) [locNodeId=" + ctx.localNodeId() +
-                        ", futVer=" + entry.getKey() + ", topVer=" + topVer + ']');
+        idealAssignment = assignment;
 
-                entry.getValue().onDone(topVer);
-            }
-        }
+        if (locCache)
+            initialize(topVer, assignment);
 
-        return updated.assignment();
+        return assignment;
     }
 
     /**
@@ -255,6 +278,8 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version.
      */
     public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
+        assert topVer.compareTo(lastVersion()) >= 0 : "[topVer = " + topVer + ", last=" + lastVersion() + ']';
+
         GridAffinityAssignment aff = head.get();
 
         assert evt.type() == EVT_DISCOVERY_CUSTOM_EVT  || aff.primaryPartitions(evt.eventNode().id()).isEmpty() : evt;
@@ -293,9 +318,10 @@ public class GridAffinityAssignmentCache {
             log.debug("Cleaning up cache for version [locNodeId=" + ctx.localNodeId() +
                 ", topVer=" + topVer + ']');
 
-        for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); )
+        for (Iterator<AffinityTopologyVersion> it = affCache.keySet().iterator(); it.hasNext(); ) {
             if (it.next().compareTo(topVer) < 0)
                 it.remove();
+        }
     }
 
     /**
@@ -351,32 +377,6 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
-     * NOTE: Use this method always when you need to calculate partition id for
-     * a key provided by user. It's required since we should apply affinity mapper
-     * logic in order to find a key that will eventually be passed to affinity function.
-     *
-     * @param key Key.
-     * @return Partition.
-     */
-    public int partition(Object key) {
-        return aff.partition(affinityKey(key));
-    }
-
-    /**
-     * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and
-     * will use {@link GridCacheDefaultAffinityKeyMapper default}.
-     *
-     * @param key Key.
-     * @return Affinity key.
-     */
-    private Object affinityKey(Object key) {
-        if (key instanceof CacheObject && !(key instanceof BinaryObject))
-            key = ((CacheObject)key).value(ctx.cacheObjectContext(), false);
-
-        return (key instanceof GridCacheInternal ? ctx.defaultAffMapper() : affMapper).affinityKey(key);
-    }
-
-    /**
      * Gets affinity nodes for specified partition.
      *
      * @param part Partition.
@@ -415,7 +415,7 @@ public class GridAffinityAssignmentCache {
      */
     public void dumpDebugInfo() {
         if (!readyFuts.isEmpty()) {
-            U.warn(log, "Pending affinity ready futures [cache=" + cacheName + "]:");
+            U.warn(log, "Pending affinity ready futures [cache=" + cacheName + ", lastVer=" + lastVersion() + "]:");
 
             for (AffinityReadyFuture fut : readyFuts.values())
                 U.warn(log, ">>> " + fut);
@@ -443,7 +443,7 @@ public class GridAffinityAssignmentCache {
 
             if (cache == null) {
                 throw new IllegalStateException("Getting affinity for topology version earlier than affinity is " +
-                    "calculated [locNodeId=" + ctx.localNodeId() +
+                    "calculated [locNode=" + ctx.discovery().localNode() +
                     ", cache=" + cacheName +
                     ", topVer=" + topVer +
                     ", head=" + head.get().topologyVersion() +
@@ -458,6 +458,53 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * @param part Partition.
+     * @param startVer Start version.
+     * @param endVer End version.
+     * @return {@code True} if primary changed or required affinity version not found in history.
+     */
+    public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
+        GridAffinityAssignment aff = affCache.get(startVer);
+
+        if (aff == null)
+            return false;
+
+        List<ClusterNode> nodes = aff.get(part);
+
+        if (nodes.isEmpty())
+            return true;
+
+        ClusterNode primary = nodes.get(0);
+
+        for (GridAffinityAssignment assignment : affCache.tailMap(startVer, false).values()) {
+            List<ClusterNode> nodes0 = assignment.assignment().get(part);
+
+            if (nodes0.isEmpty())
+                return true;
+
+            if (!nodes0.get(0).equals(primary))
+                return true;
+
+            if (assignment.topologyVersion().equals(endVer))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param aff Affinity cache.
+     */
+    public void init(GridAffinityAssignmentCache aff) {
+        assert aff.lastVersion().compareTo(lastVersion()) >= 0;
+        assert aff.idealAssignment() != null;
+
+        idealAssignment(aff.idealAssignment());
+
+        initialize(aff.lastVersion(), aff.assignments(aff.lastVersion()));
+    }
+
+    /**
      * @param topVer Topology version to wait.
      */
     private void awaitTopologyVersion(AffinityTopologyVersion topVer) {
@@ -511,5 +558,10 @@ public class GridAffinityAssignmentCache {
 
             return done;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AffinityReadyFuture.class, this);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 8a0194c..6b289e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -320,7 +320,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter {
                 AffinityInfo info = new AffinityInfo(
                     cctx.config().getAffinity(),
                     cctx.config().getAffinityMapper(),
-                    new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)),
+                    new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)),
                     cctx.cacheObjectContext());
 
                 IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info));

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
index f670960..2952ebc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityUtils.java
@@ -150,6 +150,7 @@ class GridAffinityUtils {
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Topology version.
          */
         private AffinityJob(@Nullable String cacheName, @NotNull AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
@@ -182,7 +183,7 @@ class GridAffinityUtils {
             return F.t(
                 affinityMessage(ctx, cctx.config().getAffinity()),
                 affinityMessage(ctx, cctx.config().getAffinityMapper()),
-                new GridAffinityAssignment(topVer, cctx.affinity().assignments(topVer)));
+                new GridAffinityAssignment(topVer, cctx.affinity().assignment(topVer)));
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
new file mode 100644
index 0000000..8cff65e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinityChangeMessage.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CacheAffinityChangeMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** */
+    private AffinityTopologyVersion topVer;
+
+    /** */
+    private GridDhtPartitionExchangeId exchId;
+
+    /** */
+    private Map<Integer, Map<Integer, List<UUID>>> assignmentChange;
+
+    /** */
+    private Map<Integer, IgniteUuid> cacheDeploymentIds;
+
+    /** */
+    private GridDhtPartitionsFullMessage partsMsg;
+
+    /** */
+    private transient boolean exchangeNeeded;
+
+    /**
+     * Constructor used when message is created after cache rebalance finished.
+     *
+     * @param topVer Topology version.
+     * @param assignmentChange Assignment change.
+     * @param cacheDeploymentIds Cache deployment ID.
+     */
+    public CacheAffinityChangeMessage(AffinityTopologyVersion topVer,
+        Map<Integer, Map<Integer, List<UUID>>> assignmentChange,
+        Map<Integer, IgniteUuid> cacheDeploymentIds) {
+        assert !F.isEmpty(assignmentChange) : assignmentChange;
+
+        this.topVer = topVer;
+        this.assignmentChange = assignmentChange;
+        this.cacheDeploymentIds = cacheDeploymentIds;
+    }
+
+    /**
+     * Constructor used when message is created to finish exchange.
+     *
+     * @param exchId Exchange ID.
+     * @param partsMsg Partitions messages.
+     * @param assignmentChange Assignment change.
+     */
+    public CacheAffinityChangeMessage(GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionsFullMessage partsMsg,
+        Map<Integer, Map<Integer, List<UUID>>> assignmentChange) {
+        this.exchId = exchId;
+        this.partsMsg = partsMsg;
+        this.assignmentChange = assignmentChange;
+    }
+
+    /**
+     * @return Cache deployment IDs.
+     */
+    public Map<Integer, IgniteUuid> cacheDeploymentIds() {
+        return cacheDeploymentIds;
+    }
+
+    /**
+     * @return {@code True} if request should trigger partition exchange.
+     */
+    public boolean exchangeNeeded() {
+        return exchangeNeeded;
+    }
+
+    /**
+     * @param exchangeNeeded {@code True} if request should trigger partition exchange.
+     */
+    public void exchangeNeeded(boolean exchangeNeeded) {
+        this.exchangeNeeded = exchangeNeeded;
+    }
+
+    /**
+     * @return Partitions message.
+     */
+    public GridDhtPartitionsFullMessage partitionsMessage() {
+        return partsMsg;
+    }
+
+    /**
+     * @return Affinity assignments.
+     */
+    @Nullable public Map<Integer, Map<Integer, List<UUID>>> assignmentChange() {
+        return assignmentChange;
+    }
+
+    /**
+     * @return Exchange version.
+     */
+    @Nullable public GridDhtPartitionExchangeId exchangeId() {
+        return exchId;
+    }
+
+    /**
+     * @return Topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CacheAffinityChangeMessage.class, this);
+    }
+}


Mime
View raw message