ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [09/43] ignite git commit: ignite-324 Partition exchange: node should be assigned as primary only after preloading is finished Implemented 'late affinity assignment', also fixes: - fixed BinaryObject/BinaryReaderExImpl to properly handle case when class
Date Tue, 12 Apr 2016 00:08:31 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 936e9cc..ff4ba6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -501,6 +501,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalCache<K, V> withNoRetries() {
+        CacheOperationContext opCtx = new CacheOperationContext(false, null, false, null, true, null);
+
+        return new GridCacheProxyImpl<>(ctx, this, opCtx);
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheConfiguration configuration() {
         return ctx.config();
     }
@@ -2055,8 +2062,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         }
         else {
             return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
-                @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) {
-                    return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough, needVer);
+                @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                    return tx.getAllAsync(ctx,
+                        readyTopVer,
+                        keys,
+                        deserializeBinary,
+                        skipVals,
+                        false,
+                        !readThrough,
+                        needVer);
                 }
             }, ctx.operationContextPerCall());
         }
@@ -2088,7 +2102,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         V prevVal = syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return (V)tx.putAsync(ctx, key, val, true, filter).get().value();
+                return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value();
             }
 
             @Override public String toString() {
@@ -2140,8 +2154,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         return asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, true, filter)
+            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
                     .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
@@ -2178,7 +2192,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         Boolean stored = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return tx.putAsync(ctx, key, val, false, filter).get().success();
+                return tx.putAsync(ctx, null, key, val, false, filter).get().success();
             }
 
             @Override public String toString() {
@@ -2219,8 +2233,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
-        return asyncOp(new AsyncInOp(drMap.keySet()) {
-            @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+        return asyncOp(new AsyncOp(drMap.keySet()) {
+            @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 return tx.putAllDrAsync(ctx, drMap);
             }
 
@@ -2273,6 +2287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     tx.topologyVersion(topVer);
 
                 IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx,
+                    null,
                     key,
                     (EntryProcessor<K, V, Object>)entryProcessor,
                     args);
@@ -2311,7 +2326,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                         }
                     });
 
-                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, invokeMap, args);
+                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, invokeMap, args);
 
                 Map<K, EntryProcessorResult<T>> res = fut.get().value();
 
@@ -2331,12 +2346,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKey(key);
 
-        IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
-            @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
+        IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
+            @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
                     Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
 
-                return tx.invokeAsync(ctx, invokeMap, args);
+                return tx.invokeAsync(ctx, readyTopVer, invokeMap, args);
             }
 
             @Override public String toString() {
@@ -2374,15 +2389,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
-        IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) {
-            @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
+        IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
+            @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
                     @Override public EntryProcessor apply(K k) {
                         return entryProcessor;
                     }
                 });
 
-                return tx.invokeAsync(ctx, invokeMap, args);
+                return tx.invokeAsync(ctx, readyTopVer, invokeMap, args);
             }
 
             @Override public String toString() {
@@ -2414,9 +2429,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(map.keySet());
 
-        IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(map.keySet()) {
-            @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
-                return tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
+        IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
+            @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.invokeAsync(ctx,
+                    readyTopVer,
+                    (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map,
+                    args);
             }
 
             @Override public String toString() {
@@ -2451,7 +2469,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
                 throws IgniteCheckedException {
                 IgniteInternalFuture<GridCacheReturn> fut =
-                    tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
+                    tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
 
                 return fut.get().value();
             }
@@ -2496,8 +2514,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         return asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, false, filter).chain(
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx,
+                    readyTopVer,
+                    key,
+                    val,
+                    false,
+                    filter).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
@@ -2522,7 +2545,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return (V)tx.putAsync(ctx, key, val, true, ctx.noVal()).get().value();
+                return (V)tx.putAsync(ctx, null, key, val, true, ctx.noVal()).get().value();
             }
 
             @Override public String toString() {
@@ -2543,8 +2566,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, true, ctx.noVal())
+            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.noVal())
                     .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
@@ -2572,7 +2595,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         Boolean stored = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return tx.putAsync(ctx, key, val, false, ctx.noVal()).get().success();
+                return tx.putAsync(ctx, null, key, val, false, ctx.noVal()).get().success();
             }
 
             @Override public String toString() {
@@ -2598,8 +2621,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, false, ctx.noVal()).chain(
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx,
+                    readyTopVer,
+                    key,
+                    val,
+                    false,
+                    ctx.noVal()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
@@ -2623,7 +2651,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return (V)tx.putAsync(ctx, key, val, true, ctx.hasVal()).get().value();
+                return (V)tx.putAsync(ctx, null, key, val, true, ctx.hasVal()).get().value();
             }
 
             @Override public String toString() {
@@ -2644,8 +2672,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, true, ctx.hasVal()).chain(
+            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.hasVal()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
@@ -2669,7 +2697,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         return syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                return tx.putAsync(ctx, key, val, false, ctx.hasVal()).get().success();
+                return tx.putAsync(ctx, null, key, val, false, ctx.hasVal()).get().success();
             }
 
             @Override public String toString() {
@@ -2686,8 +2714,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         return asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
-                return tx.putAsync(ctx, key, val, false, ctx.hasVal()).chain(
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAsync(ctx, readyTopVer, key, val, false, ctx.hasVal()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
             }
 
@@ -2710,7 +2738,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAsync(ctx, key, newVal, false, ctx.equalsVal(oldVal)).get().success();
+                return tx.putAsync(ctx, null, key, newVal, false, ctx.equalsVal(oldVal)).get()
+                    .success();
             }
 
             @Override public String toString() {
@@ -2731,7 +2760,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 // Register before hiding in the filter.
                 if (ctx.deploymentEnabled()) {
                     try {
@@ -2742,7 +2771,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     }
                 }
 
-                return tx.putAsync(ctx, key, newVal, false, ctx.equalsVal(oldVal)).chain(
+                return tx.putAsync(ctx, readyTopVer, key, newVal, false, ctx.equalsVal(oldVal)).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
@@ -2771,7 +2800,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         syncOp(new SyncInOp(m.size() == 1) {
             @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                tx.putAllAsync(ctx, m, false).get();
+                tx.putAllAsync(ctx, null, m, false).get();
             }
 
             @Override public String toString() {
@@ -2791,9 +2820,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(m.keySet());
 
-        return asyncOp(new AsyncInOp(m.keySet()) {
-            @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, m, false).chain(RET2NULL);
+        return asyncOp(new AsyncOp(m.keySet()) {
+            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.putAllAsync(ctx,
+                    readyTopVer,
+                    m,
+                    false).chain(RET2NULL);
             }
 
             @Override public String toString() {
@@ -2816,6 +2848,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         V prevVal = syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
                 V ret = tx.removeAllAsync(ctx,
+                    null,
                     Collections.singletonList(key),
                     /*retval*/true,
                     null,
@@ -2850,9 +2883,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
-            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+            @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 // TODO should we invoke interceptor here?
                 return tx.removeAllAsync(ctx,
+                    readyTopVer,
                     Collections.singletonList(key),
                     /*retval*/true,
                     null,
@@ -2903,6 +2937,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         syncOp(new SyncInOp(keys.size() == 1) {
             @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
                 tx.removeAllAsync(ctx,
+                    null,
                     keys,
                     /*retval*/false,
                     null,
@@ -2930,9 +2965,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (keyCheck)
             validateCacheKeys(keys);
 
-        IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
-            @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+        IgniteInternalFuture<Object> fut = asyncOp(new AsyncOp(keys) {
+            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 return tx.removeAllAsync(ctx,
+                    readyTopVer,
                     keys,
                     /*retval*/false,
                     null,
@@ -2964,6 +3000,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         boolean rmv = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
                 return tx.removeAllAsync(ctx,
+                    null,
                     Collections.singletonList(key),
                     /*retval*/false,
                     null,
@@ -3004,8 +3041,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 return tx.removeAllAsync(ctx,
+                    readyTopVer,
                     Collections.singletonList(key),
                     /*retval*/false,
                     filter,
@@ -3034,7 +3072,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         syncOp(new SyncInOp(false) {
             @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
-                tx.removeAllDrAsync(ctx, drMap).get();
+                tx.removeAllDrAsync(ctx, (Map)drMap).get();
             }
 
             @Override public String toString() {
@@ -3051,9 +3089,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
 
-        return asyncOp(new AsyncInOp(drMap.keySet()) {
-            @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
-                return tx.removeAllDrAsync(ctx, drMap);
+        return asyncOp(new AsyncOp(drMap.keySet()) {
+            @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+                return tx.removeAllDrAsync(ctx, (Map)drMap);
             }
 
             @Override public String toString() {
@@ -3080,6 +3118,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                     ctx.deploy().registerClass(val);
 
                 return tx.removeAllAsync(ctx,
+                    null,
                     Collections.singletonList(key),
                     /*retval*/false,
                     ctx.equalsVal(val),
@@ -3109,7 +3148,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             validateCacheKey(key);
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
-            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+            @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
                 // Register before hiding in the filter.
                 if (ctx.deploymentEnabled()) {
                     try {
@@ -3121,6 +3160,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 }
 
                 return tx.removeAllAsync(ctx,
+                    readyTopVer,
                     Collections.singletonList(key),
                     /*retval*/false,
                     ctx.equalsVal(val),
@@ -3280,7 +3320,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         try {
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            GridCacheEntryEx e = entry0(cacheKey, new AffinityTopologyVersion(ctx.discovery().topologyVersion()),
+            GridCacheEntryEx e = entry0(cacheKey, ctx.discovery().topologyVersionEx(),
                 false, false);
 
             if (e == null)
@@ -4257,31 +4297,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                                 return new GridFinishedFuture<>(
                                     new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
 
-                            ctx.operationContextPerCall(opCtx);
-
-                            try {
-                                return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
-                                    @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
-                                        try {
-                                            return tFut.get();
-                                        }
-                                        catch (IgniteTxRollbackCheckedException e) {
-                                            throw e;
-                                        }
-                                        catch (IgniteCheckedException e1) {
-                                            tx0.rollbackAsync();
+                            return op.op(tx0, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
+                                @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
+                                    try {
+                                        return tFut.get();
+                                    }
+                                    catch (IgniteTxRollbackCheckedException e) {
+                                        throw e;
+                                    }
+                                    catch (IgniteCheckedException e1) {
+                                        tx0.rollbackAsync();
 
-                                            throw e1;
-                                        }
-                                        finally {
-                                            ctx.shared().txContextReset();
-                                        }
+                                        throw e1;
                                     }
-                                });
-                            }
-                            finally {
-                                ctx.operationContextPerCall(null);
-                            }
+                                    finally {
+                                        ctx.shared().txContextReset();
+                                    }
+                                }
+                            });
                         }
                     });
 
@@ -4290,7 +4323,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 return f;
             }
 
-            final IgniteInternalFuture<T> f = op.op(tx).chain(new CX1<IgniteInternalFuture<T>, T>() {
+            final IgniteInternalFuture<T> f = op.op(tx, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() {
                 @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
                     try {
                         return tFut.get();
@@ -5117,40 +5150,95 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         /**
          * @param tx Transaction.
-         * @return Operation return value.
+         * @param readyTopVer Ready topology version.
+         * @return Operation future.
          */
-        public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx);
-    }
+        public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer);
 
-    /**
-     * Cache operation.
-     */
-    private abstract class AsyncInOp extends AsyncOp<Object> {
         /**
-         *
+         * @param tx Transaction.
+         * @param opCtx Operation context.
+         * @return Operation future.
          */
-        protected AsyncInOp() {
-            super();
+        public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) {
+            AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
+
+            if (txTopVer != null)
+                return op(tx, (AffinityTopologyVersion)null);
+
+            // Tx needs affinity for entry creation, wait when affinity is ready to avoid blocking inside async operation.
+            final AffinityTopologyVersion topVer = ctx.shared().exchange().topologyVersion();
+
+            IgniteInternalFuture<?> topFut = ctx.shared().exchange().affinityReadyFuture(topVer);
+
+            if (topFut == null || topFut.isDone())
+                return op(tx, topVer);
+            else
+                return waitTopologyFuture(topFut, topVer, tx, opCtx);
         }
 
         /**
-         * @param keys Keys involved.
+         * @param topFut Topology future.
+         * @param topVer Topology version to use.
+         * @param tx Transaction.
+         * @param opCtx Operation context.
+         * @return Operation future.
          */
-        protected AsyncInOp(Collection<?> keys) {
-            super(keys);
-        }
+        protected IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
+            final AffinityTopologyVersion topVer,
+            final IgniteTxLocalAdapter tx,
+            final CacheOperationContext opCtx) {
+            final GridFutureAdapter fut0 = new GridFutureAdapter();
+
+            topFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> topFut) {
+                    try {
+                        topFut.get();
 
-        /** {@inheritDoc} */
-        @SuppressWarnings({"unchecked"})
-        @Override public final IgniteInternalFuture<Object> op(IgniteTxLocalAdapter tx) {
-            return (IgniteInternalFuture<Object>)inOp(tx);
+                        IgniteInternalFuture<?> opFut = runOp(tx, topVer, opCtx);
+
+                        opFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> opFut) {
+                                try {
+                                    fut0.onDone(opFut.get());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    fut0.onDone(e);
+                                }
+                            }
+                        });
+                    }
+                    catch (IgniteCheckedException e) {
+                        fut0.onDone(e);
+                    }
+                }
+            });
+
+            return fut0;
         }
 
         /**
          * @param tx Transaction.
-         * @return Operation return value.
+         * @param topVer Ready topology version.
+         * @param opCtx Operation context.
+         * @return Future.
          */
-        public abstract IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx);
+        private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx,
+            AffinityTopologyVersion topVer,
+            CacheOperationContext opCtx) {
+            ctx.operationContextPerCall(opCtx);
+
+            ctx.shared().txContextReset();
+
+            try {
+                return op(tx, topVer);
+            }
+            finally {
+                ctx.shared().txContextReset();
+
+                ctx.operationContextPerCall(null);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 21975da..e264043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -43,7 +45,7 @@ import java.util.UUID;
  */
 public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     /** */
-    private static final AffinityTopologyVersion TOP_FIRST = new AffinityTopologyVersion(1);
+    private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1);
 
     /** */
     public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " +
@@ -52,25 +54,45 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     /** Affinity cached function. */
     private GridAffinityAssignmentCache aff;
 
+    /** */
+    private AffinityFunction affFunction;
+
+    /** */
+    private AffinityKeyMapper affMapper;
+
     /** {@inheritDoc} */
     @Override public void start0() throws IgniteCheckedException {
-        aff = new GridAffinityAssignmentCache(cctx, cctx.namex(), cctx.config().getAffinity(),
-            cctx.config().getAffinityMapper(), cctx.config().getBackups());
+        affFunction = cctx.config().getAffinity();
+        affMapper = cctx.config().getAffinityMapper();
+
+        aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
+            cctx.namex(),
+            affFunction,
+            cctx.config().getNodeFilter(),
+            cctx.config().getBackups(),
+            cctx.isLocal());
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStart0() throws IgniteCheckedException {
         if (cctx.isLocal())
             // No discovery event needed for local affinity.
-            aff.calculate(TOP_FIRST, null);
+            aff.calculate(LOC_CACHE_TOP_VER, null);
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
+        cancelFutures();
+    }
+
+    /**
+     *
+     */
+    public void cancelFutures() {
         IgniteCheckedException err =
             new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
 
-        aff.onKernalStop(err);
+        aff.cancelFutures(err);
     }
 
     /** {@inheritDoc} */
@@ -78,7 +100,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to wait for topology update, client disconnected.");
 
-        aff.onKernalStop(err);
+        aff.cancelFutures(err);
     }
 
     /**
@@ -144,52 +166,23 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * Initializes affinity for joined node.
-     *
-     * @param topVer Topology version.
-     * @param affAssignment Affinity assignment for this topology version.
-     */
-    public void initializeAffinity(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) {
-        assert !cctx.isLocal();
-
-        aff.initialize(topVer, affAssignment);
-    }
-
-    /**
      * @param topVer Topology version.
      * @return Affinity assignments.
      */
     public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = new AffinityTopologyVersion(1);
+            topVer = LOC_CACHE_TOP_VER;
 
         return aff.assignments(topVer);
     }
 
     /**
-     * Calculates affinity cache for given topology version.
-     *
-     * @param topVer Topology version to calculate affinity for.
-     * @param discoEvt Discovery event that causes this topology change.
-     * @return Affinity assignments.
+     * @return Assignment.
      */
-    public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+    public List<List<ClusterNode>> idealAssignment() {
         assert !cctx.isLocal();
 
-        return aff.calculate(topVer, discoEvt);
-    }
-
-    /**
-     * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
-     * (e.g. client node joins on leaves).
-     *
-     * @param evt Event.
-     * @param topVer Topology version.
-     */
-    public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
-        assert !cctx.isLocal();
-
-        aff.clientEventTopologyChange(evt, topVer);
+        return aff.idealAssignment();
     }
 
     /**
@@ -218,7 +211,21 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         if (aff0 == null)
             throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());
 
-        return aff0.partition(key);
+        return affFunction.partition(affinityKey(key));
+    }
+
+    /**
+     * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and
+     * will use {@link GridCacheDefaultAffinityKeyMapper default}.
+     *
+     * @param key Key.
+     * @return Affinity key.
+     */
+    private Object affinityKey(Object key) {
+        if (key instanceof CacheObject && !(key instanceof BinaryObject))
+            key = ((CacheObject)key).value(cctx.cacheObjectContext(), false);
+
+        return (key instanceof GridCacheInternal ? cctx.defaultAffMapper() : affMapper).affinityKey(key);
     }
 
     /**
@@ -237,7 +244,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      */
     public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = new AffinityTopologyVersion(1);
+            topVer = LOC_CACHE_TOP_VER;
 
         GridAffinityAssignmentCache aff0 = aff;
 
@@ -250,12 +257,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
     /**
      * Get affinity assignment for the given topology version.
      *
-     * @param topVer Toplogy version.
-     * @return Affinity affignment.
+     * @param topVer Topology version.
+     * @return Affinity assignment.
      */
     public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = new AffinityTopologyVersion(1);
+            topVer = LOC_CACHE_TOP_VER;
 
         GridAffinityAssignmentCache aff0 = aff;
 
@@ -401,7 +408,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      */
     public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = new AffinityTopologyVersion(1);
+            topVer = LOC_CACHE_TOP_VER;
 
         GridAffinityAssignmentCache aff0 = aff;
 
@@ -418,7 +425,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
      */
     public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) {
         if (cctx.isLocal())
-            topVer = new AffinityTopologyVersion(1);
+            topVer = LOC_CACHE_TOP_VER;
 
         GridAffinityAssignmentCache aff0 = aff;
 
@@ -449,4 +456,28 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
         if (aff0 != null)
             aff0.dumpDebugInfo();
     }
+
+    /**
+     * @return Affinity cache.
+     */
+    public GridAffinityAssignmentCache affinityCache() {
+        return aff;
+    }
+
+    /**
+     * @param part Partition.
+     * @param startVer Start version.
+     * @param endVer End version.
+     * @return {@code True} if primary changed or required affinity version not found in history.
+     */
+    public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) {
+        assert !cctx.isLocal() : cctx.name();
+
+        GridAffinityAssignmentCache aff0 = aff;
+
+        if (aff0 == null)
+            throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name());
+
+        return aff0.primaryChanged(part, startVer, endVer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
index ad8ade1..ffce82d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index d159afa..88d6e04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1260,7 +1260,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     /**
      * Sets thread local cache operation context.
      *
-     * @param opCtx Flags to set.
+     * @param opCtx Operation context.
      */
     public void operationContextPerCall(@Nullable CacheOperationContext opCtx) {
         if (nearContext())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
index b1a57a9..a21e18b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java
@@ -1474,9 +1474,16 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
 
                 ClusterNode loc = cctx.localNode();
 
+                AffinityTopologyVersion initTopVer =
+                    new AffinityTopologyVersion(cctx.discovery().localJoinEvent().topologyVersion(), 0);
+
+                AffinityTopologyVersion cacheStartVer = cctx.startTopologyVersion();
+
+                if (cacheStartVer != null && cacheStartVer.compareTo(initTopVer) > 0)
+                    initTopVer = cacheStartVer;
+
                 // Initialize.
-                primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(),
-                    cctx.affinity().affinityTopologyVersion()));
+                primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(), initTopVer));
 
                 while (!isCancelled()) {
                     DiscoveryEvent evt = evts.take();
@@ -1484,12 +1491,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                     if (log.isDebugEnabled())
                         log.debug("Processing event: " + evt);
 
+                    AffinityTopologyVersion topVer = new AffinityTopologyVersion(evt.topologyVersion());
+
                     // Remove partitions that are no longer primary.
                     for (Iterator<Integer> it = primaryParts.iterator(); it.hasNext();) {
                         if (!evts.isEmpty())
                             break;
 
-                        if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion())))
+                        if (!cctx.affinity().primary(loc, it.next(), topVer))
                             it.remove();
                     }
 
@@ -1501,8 +1510,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter {
                         if (!evts.isEmpty())
                             break;
 
-                        if (part.primary(new AffinityTopologyVersion(evt.topologyVersion()))
-                            && primaryParts.add(part.id())) {
+                        if (part.primary(topVer) && primaryParts.add(part.id())) {
                             if (log.isDebugEnabled())
                                 log.debug("Touching partition entries: " + part);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..aab1bcc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -128,16 +128,33 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
 
                     AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
 
-                    assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 :
-                        "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']';
+                    DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId());
 
-                    // Need to wait for initial exchange to avoid race between cache start and affinity request.
+                    if (cacheDesc != null) {
+                        if (cacheDesc.startTopologyVersion() != null)
+                            startTopVer = cacheDesc.startTopologyVersion();
+                        else if (cacheDesc.receivedFromStartVersion() != null)
+                            startTopVer = cacheDesc.receivedFromStartVersion();
+                    }
+
+                    // Need to wait for exchange to avoid race between cache start and affinity request.
                     fut = cctx.exchange().affinityReadyFuture(startTopVer);
 
                     if (fut != null && !fut.isDone()) {
-                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
-                            @Override public void run() {
-                                lsnr.onMessage(nodeId, cacheMsg);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Wait for exchange before processing message [msg=" + msg +
+                                ", node=" + nodeId +
+                                ", waitVer=" + startTopVer +
+                                ", cacheDesc=" + cacheDesc + ']');
+                        }
+
+                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                                    @Override public void run() {
+                                       handleMessage(nodeId, cacheMsg);
+                                    }
+                                });
                             }
                         });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 511eed4..cc1a8d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1720,7 +1720,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     // Must persist inside synchronization in non-tx mode.
                     cctx.store().put(null, key, updated, ver);
 
-
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 updateIndex(updated, expireTime, ver, old);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
index aba8318..f1c1b83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java
@@ -29,11 +29,11 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -602,7 +602,13 @@ public class GridCacheMvccCandidate implements Externalizable,
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         IgniteUtils.writeUuid(out, nodeId);
 
-        CU.writeVersion(out, ver);
+        out.writeBoolean(ver == null);
+
+        if (ver != null) {
+            out.writeBoolean(ver instanceof GridCacheVersionEx);
+
+            ver.writeExternal(out);
+        }
 
         out.writeLong(timeout);
         out.writeLong(threadId);
@@ -614,7 +620,11 @@ public class GridCacheMvccCandidate implements Externalizable,
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         nodeId = IgniteUtils.readUuid(in);
 
-        ver = CU.readVersion(in);
+        if (!in.readBoolean()) {
+            ver = in.readBoolean() ? new GridCacheVersionEx() : new GridCacheVersion();
+
+            ver.readExternal(in);
+        }
 
         timeout = in.readLong();
         threadId = in.readLong();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 462c511..0d9f174 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -110,7 +110,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private static final int EXCHANGE_HISTORY_SIZE = 1000;
 
     /** Cleanup history size. */
-    public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100);
+    public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500);
 
     /** Atomic reference for pending timeout object. */
     private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
@@ -189,8 +189,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
                         assert cctx.discovery().node(n.id()) == null;
 
-                        for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
-                            f.onNodeLeft(n.id());
+                        // Avoid race b/w initial future add and discovery event.
+                        GridDhtPartitionsExchangeFuture initFut = null;
+
+                        if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) {
+                            initFut = exchangeFuture(initialExchangeId(), null, null, null);
+
+                            initFut.onNodeLeft(n);
+                        }
+
+                        for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) {
+                            if (f != initFut)
+                                f.onNodeLeft(n);
+                        }
                     }
 
                     assert
@@ -202,7 +213,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         affinityTopologyVersion(e),
                         e.type());
 
-                    exchFut = exchangeFuture(exchId, e, null);
+                    exchFut = exchangeFuture(exchId, e, null, null);
                 }
                 else {
                     DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
@@ -237,9 +248,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         if (!F.isEmpty(valid)) {
                             exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
 
-                            exchFut = exchangeFuture(exchId, e, valid);
+                            exchFut = exchangeFuture(exchId, e, valid, null);
                         }
                     }
+                    else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+
+                        if (msg.exchangeId() == null) {
+                            if (msg.exchangeNeeded()) {
+                                exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type());
+
+                                exchFut = exchangeFuture(exchId, e, null, msg);
+                            }
+                        }
+                        else
+                            exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                    }
                 }
 
                 if (exchId != null) {
@@ -301,6 +325,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         return reconnectExchangeFut;
     }
 
+    /**
+     * @return Initial exchange ID.
+     */
+    private GridDhtPartitionExchangeId initialExchangeId() {
+        DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
+
+        assert discoEvt != null;
+
+        final AffinityTopologyVersion startTopVer = affinityTopologyVersion(discoEvt);
+
+        assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
+
+        return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED);
+    }
+
     /** {@inheritDoc} */
     @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
         super.onKernalStart0(reconnect);
@@ -314,15 +353,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         // Generate dummy discovery event for local node joining.
         DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent();
 
-        final AffinityTopologyVersion startTopVer = affinityTopologyVersion(discoEvt);
-
-        GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED);
-
-        assert discoEvt != null;
-
-        assert discoEvt.topologyVersion() == startTopVer.topologyVersion();
+        GridDhtPartitionExchangeId exchId = initialExchangeId();
 
-        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null);
+        GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null);
 
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
@@ -339,14 +372,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             return;
 
                         try {
-                            if (m instanceof GridDhtPartitionSupplyMessageV2)
-                                cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage(
-                                    idx, id, (GridDhtPartitionSupplyMessageV2)m);
-                            else if (m instanceof GridDhtPartitionDemandMessage)
-                                cctx.cacheContext(m.cacheId).preloader().handleDemandMessage(
-                                    idx, id, (GridDhtPartitionDemandMessage)m);
-                            else
-                                log.error("Unsupported message type: " + m.getClass().getName());
+                            GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
+
+                            if (cacheCtx != null) {
+                                if (m instanceof GridDhtPartitionSupplyMessageV2)
+                                    cacheCtx.preloader().handleSupplyMessage(
+                                        idx, id, (GridDhtPartitionSupplyMessageV2)m);
+                                else if (m instanceof GridDhtPartitionDemandMessage)
+                                    cacheCtx.preloader().handleDemandMessage(
+                                        idx, id, (GridDhtPartitionDemandMessage)m);
+                                else
+                                    U.error(log, "Unsupported message type: " + m.getClass().getName());
+                            }
                         }
                         finally {
                             leaveBusy();
@@ -708,28 +745,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * Refresh partitions.
-     *
-     * @param timeout Timeout.
-     */
-    private void refreshPartitions(long timeout) {
-        long last = lastRefresh.get();
-
-        long now = U.currentTimeMillis();
-
-        if (last != -1 && now - last >= timeout && lastRefresh.compareAndSet(last, now)) {
-            if (log.isDebugEnabled())
-                log.debug("Refreshing partitions [last=" + last + ", now=" + now + ", delta=" + (now - last) +
-                    ", timeout=" + timeout + ", lastRefresh=" + lastRefresh + ']');
-
-            refreshPartitions();
-        }
-        else if (log.isDebugEnabled())
-            log.debug("Partitions were not refreshed [last=" + last + ", now=" + now + ", delta=" + (now - last) +
-                ", timeout=" + timeout + ", lastRefresh=" + lastRefresh + ']');
-    }
-
-    /**
      * @param nodes Nodes.
      * @return {@code True} if message was sent, {@code false} if node left grid.
      */
@@ -838,20 +853,26 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
      * @param reqs Cache change requests.
+     * @param affChangeMsg Affinity change message.
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
-        @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs) {
+        @Nullable DiscoveryEvent discoEvt,
+        @Nullable Collection<DynamicCacheChangeRequest> reqs,
+        @Nullable CacheAffinityChangeMessage affChangeMsg) {
         GridDhtPartitionsExchangeFuture fut;
 
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
-            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
+            fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg));
 
         if (old != null) {
             fut = old;
 
             if (reqs != null)
                 fut.cacheChangeRequests(reqs);
+
+            if (affChangeMsg != null)
+                fut.affinityChangeMessage(affChangeMsg);
         }
 
         if (discoEvt != null)
@@ -892,7 +913,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer));
 
-        for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet())
+        AffinityTopologyVersion histVer = new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0);
+
+        for (AffinityTopologyVersion oldVer : nodeVers.headMap(histVer).keySet())
             nodeVers.remove(oldVer);
 
         if (err == null) {
@@ -940,11 +963,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 skipped++;
 
                 if (skipped == EXCH_FUT_CLEANUP_HISTORY_SIZE) {
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        if (err == null) {
+                    if (err == null) {
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                             if (!cacheCtx.isLocal())
                                 cacheCtx.affinity().cleanUpCache(fut.topologyVersion());
                         }
+
+                        cctx.affinity().cleanUpCache(fut.topologyVersion());
                     }
                 }
 
@@ -1006,7 +1031,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     refreshPartitions();
             }
             else
-                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+                exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
         }
         finally {
             leaveBusy();
@@ -1047,8 +1072,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     else if (!cacheCtx.isLocal())
                         top = cacheCtx.topology();
 
-                    if (top != null)
+                    if (top != null) {
                         updated |= top.update(null, entry.getValue(), null) != null;
+
+                        cctx.affinity().checkRebalanceState(top, cacheId);
+                    }
                 }
 
                 if (updated)
@@ -1058,17 +1086,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (msg.client()) {
                     final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
                         null,
+                        null,
                         null);
 
                     exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
                             // Finished future should reply only to sender client node.
-                            exchFut.onReceive(node.id(), msg);
+                            exchFut.onReceive(node, msg);
                         }
                     });
                 }
                 else
-                    exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+                    exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg);
             }
         }
         finally {
@@ -1127,26 +1156,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
 
-        dumpPendingObjects();
+        dumpPendingObjects(null);
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts())
             cacheCtx.preloader().dumpDebugInfo();
 
+        cctx.affinity().dumpDebugInfo();
+
         // Dump IO manager statistics.
         cctx.gridIO().dumpStats();
     }
 
     /**
-     *
+     * @param exchTopVer Exchange topology version.
      */
-    public void dumpPendingObjects() {
+    public void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) {
         IgniteTxManager tm = cctx.tm();
 
         if (tm != null) {
             U.warn(log, "Pending transactions:");
 
-            for (IgniteInternalTx tx : tm.activeTransactions())
-                U.warn(log, ">>> " + tx);
+            for (IgniteInternalTx tx : tm.activeTransactions()) {
+                if (exchTopVer != null) {
+                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
+                        ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
+                        ", tx=" + tx + ']');
+                }
+                else
+                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
+            }
         }
 
         GridCacheMvccManager mvcc = cctx.mvcc();
@@ -1430,7 +1468,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                         if (marshR != null || !rebalanceQ.isEmpty()) {
                             if (futQ.isEmpty()) {
-                                U.log(log, "Rebalancing required" +
+                                U.log(log, "Rebalancing required " +
                                     "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
                                     ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index be019fc..acd8c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -73,18 +73,6 @@ public interface GridCachePreloader {
     public void onInitialExchangeComplete(@Nullable Throwable err);
 
     /**
-     * Callback by exchange manager when new exchange future is added to worker.
-     */
-    public void onExchangeFutureAdded();
-
-    /**
-     * Updates last exchange future.
-     *
-     * @param lastFut Last future.
-     */
-    public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut);
-
-    /**
      * @param exchFut Exchange future to assign.
      * @return Assignments or {@code null} if detected that there are pending exchanges.
      */
@@ -192,11 +180,9 @@ public interface GridCachePreloader {
     public void evictPartitionAsync(GridDhtLocalPartition part);
 
     /**
-     * Handles new topology.
-     *
-     * @param topVer Topology version.
+     * @param lastFut Last future.
      */
-    public void onTopologyChanged(AffinityTopologyVersion topVer);
+    public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut);
 
     /**
      * Dumps debug information.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 5d98c6f..e393421 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -159,16 +159,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void onExchangeFutureAdded() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
     @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
@@ -185,7 +175,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
+    @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 67d6a6c..38f861b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -104,6 +104,7 @@ import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -138,6 +139,7 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS;
 import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
 import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
 import static org.apache.ignite.configuration.DeploymentMode.SHARED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.internal.IgniteComponentType.JTA;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG;
@@ -645,6 +647,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             desc.locallyConfigured(true);
             desc.staticallyConfigured(true);
+            desc.receivedFrom(ctx.localNodeId());
 
             if (!template) {
                 registeredCaches.put(masked, desc);
@@ -757,10 +760,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName()))
                     continue;
 
-                boolean started = desc.onStart();
-
-                assert started : "Failed to change started flag for locally configured cache: " + desc;
-
                 desc.clearRemoteConfigurations();
 
                 CacheConfiguration ccfg = desc.cacheConfiguration();
@@ -770,6 +769,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 boolean loc = desc.locallyConfigured();
 
                 if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
+                    boolean started = desc.onStart();
+
+                    assert started : "Failed to change started flag for locally configured cache: " + desc;
+
                     CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
                     CachePluginManager pluginMgr = desc.pluginManager();
@@ -890,6 +893,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         // No new caches should be added after this point.
         exch.onKernalStop(cancel);
 
+        for (GridCacheAdapter<?, ?> cache : caches.values()) {
+            GridCacheAffinityManager aff = cache.context().affinity();
+
+            if (aff != null)
+                aff.cancelFutures();
+        }
+
         for (String cacheName : stopSeq) {
             GridCacheAdapter<?, ?> cache = caches.remove(maskNull(cacheName));
 
@@ -1287,12 +1297,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         storeMgr.initialize(cfgStore, sesHolders);
 
+        boolean affNode = CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter());
+
         GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
             ctx,
             sharedCtx,
             cfg,
             cacheType,
-            ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+            affNode,
             updatesAllowed,
 
             /*
@@ -1421,7 +1433,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 sharedCtx,
                 cfg,
                 cacheType,
-                ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+                affNode,
                 true,
 
                 /*
@@ -1531,7 +1543,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             },
             new IgnitePredicate<DynamicCacheDescriptor>() {
                 @Override public boolean apply(DynamicCacheDescriptor desc) {
-                    return desc.started() && desc.cacheType().userCache();
+                    return desc.cacheType().userCache();
                 }
             }
         );
@@ -1549,42 +1561,57 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param reqs Requests to start.
+     * @param req Cache start request.
      * @param topVer Topology version.
-     * @throws IgniteCheckedException If failed to start cache.
+     * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("TypeMayBeWeakened")
-    public void prepareCachesStart(
-        Collection<DynamicCacheChangeRequest> reqs,
-        AffinityTopologyVersion topVer
-    ) throws IgniteCheckedException {
-        if (ctx.isDaemon())
-            return;
+    public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        assert req.start() : req;
+        assert req.cacheType() != null : req;
+
+        prepareCacheStart(
+            req.startCacheConfiguration(),
+            req.nearCacheConfiguration(),
+            req.cacheType(),
+            req.clientStartOnly(),
+            req.initiatingNodeId(),
+            req.deploymentId(),
+            topVer
+        );
 
-        for (DynamicCacheChangeRequest req : reqs) {
-            assert req.start() : req;
-            assert req.cacheType() != null : req;
-
-            prepareCacheStart(
-                req.startCacheConfiguration(),
-                req.nearCacheConfiguration(),
-                req.cacheType(),
-                req.clientStartOnly(),
-                req.initiatingNodeId(),
-                req.deploymentId(),
-                topVer
-            );
+        DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
 
-            DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+        if (desc != null)
+            desc.onStart();
+    }
 
-            if (desc != null)
-                desc.onStart();
-        }
+    /**
+     * Starts statically configured caches received from remote nodes during exchange.
+     *
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException If failed.
+     * @return Started caches descriptors.
+     */
+    public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
+        List<DynamicCacheDescriptor> started = null;
 
-        // Start statically configured caches received from remote nodes during exchange.
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-            if (desc.staticallyConfigured() && !desc.locallyConfigured()) {
+            if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) {
+                if (desc.receivedFrom() != null) {
+                    AffinityTopologyVersion startVer = desc.receivedFromStartVersion();
+
+                    if (startVer == null || startVer.compareTo(topVer) > 0)
+                        continue;
+                }
+
                 if (desc.onStart()) {
+                    if (started == null)
+                        started = new ArrayList<>();
+
+                    started.add(desc);
+
                     prepareCacheStart(
                         desc.cacheConfiguration(),
                         null,
@@ -1597,6 +1624,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 }
             }
         }
+
+        return started;
     }
 
     /**
@@ -1786,6 +1815,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param kernalCtx Kernal context.
      * @param storeSesLsnrs Store session listeners.
      * @return Shared context.
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
@@ -1796,6 +1826,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager();
         GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager();
         GridCacheIoManager ioMgr = new GridCacheIoManager();
+        CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
 
         CacheJtaManagerAdapter jta = JTA.createOptional();
 
@@ -1806,6 +1837,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mvccMgr,
             depMgr,
             exchMgr,
+            topMgr,
             ioMgr,
             jta,
             storeSesLsnrs
@@ -1845,6 +1877,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 req.deploymentId(desc.deploymentId());
 
+                req.receivedFrom(desc.receivedFrom());
+
                 reqs.add(req);
 
                 Boolean nearEnabled = cache.isNear();
@@ -1868,6 +1902,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 req.deploymentId(desc.deploymentId());
 
+                req.receivedFrom(desc.receivedFrom());
+
                 reqs.add(req);
             }
 
@@ -1943,9 +1979,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         CacheConfiguration ccfg = req.startCacheConfiguration();
 
                         if (existing != null) {
-                            if (existing.locallyConfigured()) {
+                            if (joiningNodeId.equals(ctx.localNodeId())) {
+                                existing.receivedFrom(req.receivedFrom());
+
                                 existing.deploymentId(req.deploymentId());
+                            }
 
+                            if (existing.locallyConfigured()) {
                                 existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration());
 
                                 ctx.discovery().setCacheFilter(
@@ -1972,6 +2012,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             if (joiningNodeId.equals(ctx.localNodeId()))
                                 desc.receivedOnDiscovery(true);
 
+                            desc.receivedFrom(req.receivedFrom());
+
                             DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
 
                             assert old == null : old;
@@ -2442,7 +2484,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Callback invoked from discovery thread when discovery custom message  is received.
+     * @param type Event type.
+     * @param node Event node.
+     * @param topVer Topology version.
+     */
+    public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) {
+        if (type == EVT_NODE_JOINED) {
+            for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) {
+                if (node.id().equals(cacheDesc.receivedFrom()))
+                    cacheDesc.receivedFromStartVersion(topVer);
+            }
+        }
+
+        sharedCtx.affinity().onDiscoveryEvent(type, node, topVer);
+    }
+
+    /**
+     * Callback invoked from discovery thread when discovery custom message is received.
      *
      * @param msg Customer message.
      * @param topVer Current topology version.
@@ -2450,6 +2508,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     public boolean onCustomEvent(DiscoveryCustomMessage msg,
         AffinityTopologyVersion topVer) {
+        if (msg instanceof CacheAffinityChangeMessage)
+            return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));
+
         return msg instanceof DynamicCacheChangeBatch && onCacheChangeRequested((DynamicCacheChangeBatch) msg, topVer);
     }
 
@@ -3127,6 +3188,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Cache descriptors.
+     */
+    public Collection<DynamicCacheDescriptor> cacheDescriptors() {
+        return registeredCaches.values();
+    }
+
+    /**
      * @param cacheId Cache ID.
      * @return Cache descriptor.
      */
@@ -3438,10 +3506,45 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If validation failed.
      * @return Configuration copy.
      */
-    private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException {
+    private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException {
         if (val == null)
             return null;
 
+        return withBinaryContext(new IgniteOutClosureX<CacheConfiguration>() {
+            @Override public CacheConfiguration applyx() throws IgniteCheckedException {
+                if (val.getCacheStoreFactory() != null) {
+                    try {
+                        ClassLoader ldr = ctx.config().getClassLoader();
+
+                        if (ldr == null)
+                            ldr = val.getCacheStoreFactory().getClass().getClassLoader();
+
+                        marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
+                            U.resolveClassLoader(ldr, ctx.config()));
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteCheckedException("Failed to validate cache configuration. " +
+                            "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
+                    }
+                }
+
+                try {
+                    return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config()));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteCheckedException("Failed to validate cache configuration " +
+                        "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param c Closure.
+     * @throws IgniteCheckedException If failed.
+     * @return Closure result.
+     */
+    private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException {
         IgniteCacheObjectProcessor objProc = ctx.cacheObjects();
         BinaryContext oldCtx = null;
 
@@ -3452,29 +3555,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
 
         try {
-            if (val.getCacheStoreFactory() != null) {
-                try {
-                    ClassLoader ldr = ctx.config().getClassLoader();
-
-                    if (ldr == null)
-                        ldr = val.getCacheStoreFactory().getClass().getClassLoader();
-
-                    marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()),
-                        U.resolveClassLoader(ldr, ctx.config()));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteCheckedException("Failed to validate cache configuration. " +
-                        "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e);
-                }
-            }
-
-            try {
-                return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config()));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteCheckedException("Failed to validate cache configuration " +
-                    "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e);
-            }
+            return c.applyx();
         }
         finally {
             if (objProc instanceof CacheObjectBinaryProcessorImpl)
@@ -3483,6 +3564,19 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param obj Object to clone.
+     * @return Object copy.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <T> T clone(final T obj) throws IgniteCheckedException {
+        return withBinaryContext(new IgniteOutClosureX<T>() {
+            @Override public T applyx() throws IgniteCheckedException {
+                return marshaller.unmarshal(marshaller.marshal(obj), U.resolveClassLoader(ctx.config()));
+            }
+        });
+    };
+
+    /**
      * @param name Name to mask.
      * @return Masked name.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 66304e4..03735cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1659,6 +1659,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteInternalCache<K, V> withNoRetries() {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return new GridCacheProxyImpl<>(ctx, delegate,
+                new CacheOperationContext(false, null, false, null, true, null));
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(ctx);
         out.writeObject(delegate);


Mime
View raw message