ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [24/28] incubator-ignite git commit: ignite-545: merge from sprint-6
Date Wed, 10 Jun 2015 14:11:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 0e1a9c2..871cd77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
@@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -153,7 +153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg.setMemoryMode(DFLT_MEMORY_MODE);
 
         if (cfg.getNodeFilter() == null)
-            cfg.setNodeFilter(CacheConfiguration.SERVER_NODES);
+            cfg.setNodeFilter(CacheConfiguration.ALL_NODES);
 
         if (cfg.getAffinity() == null) {
             if (cfg.getCacheMode() == PARTITIONED) {
@@ -541,10 +541,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());
 
-        ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() {
-            @Override public void apply(Serializable evt) {
-                if (evt instanceof DynamicCacheChangeBatch)
-                    onCacheChangeRequested((DynamicCacheChangeBatch)evt);
+        ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
+            new CustomEventListener<DynamicCacheChangeBatch>() {
+            @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) {
+                onCacheChangeRequested(msg);
             }
         });
 
@@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-        sharedCtx = createSharedContext(ctx);
+        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
+            ctx.config().getCacheStoreSessionListenerFactories()));
 
         ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
             !ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -622,9 +623,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 ctx.discovery().setCacheFilter(
                     cfg.getName(),
                     cfg.getNodeFilter(),
-                    cfg.getNearConfiguration() != null,
+                    cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
                     cfg.getCacheMode() == LOCAL);
 
+                ctx.discovery().addClientNode(cfg.getName(),
+                    ctx.localNodeId(),
+                    cfg.getNearConfiguration() != null);
+
                 if (!cacheType.userCache())
                     stopSeq.addLast(cfg.getName());
                 else
@@ -669,6 +674,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
                 for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
+                        continue;
+
                     checkTransactionConfiguration(n);
 
                     DeploymentMode locDepMode = ctx.config().getDeploymentMode();
@@ -683,7 +691,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         if (rmtCfg != null) {
                             CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                            checkCache(locCfg, rmtCfg, n);
+                            checkCache(locCfg, rmtCfg, n, desc);
 
                             // Check plugin cache configurations.
                             CachePluginManager pluginMgr = desc.pluginManager();
@@ -706,12 +714,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 IgnitePredicate filter = ccfg.getNodeFilter();
 
-                if (filter.apply(locNode)) {
+                boolean loc = desc.locallyConfigured();
+
+                if (loc || CU.affinityNode(locNode, filter)) {
                     CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
                     CachePluginManager pluginMgr = desc.pluginManager();
 
-                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(
+                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
 
                     ctx.dynamicDeploymentId(desc.deploymentId());
 
@@ -754,8 +765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 int order = cfg.getRebalanceOrder();
 
                 if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) {
-                    GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts
-                        .get(order);
+                    GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order);
 
                     if (fut == null) {
                         fut = new GridCompoundFuture<>();
@@ -774,18 +784,31 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
+        boolean utilityCacheStarted = false;
+
         // Wait for caches in SYNC preload mode.
-        for (GridCacheAdapter<?, ?> cache : caches.values()) {
-            CacheConfiguration cfg = cache.configuration();
+        for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
+            GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
+
+            if (cache != null) {
+                if (cfg.getRebalanceMode() == SYNC) {
+                    if (cfg.getCacheMode() == REPLICATED ||
+                        (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) {
+                        cache.preloader().syncFuture().get();
+
+                        if (CU.isUtilityCache(cache.name())) {
+                            ctx.cacheObjects().onUtilityCacheStarted();
 
-            if (cfg.getRebalanceMode() == SYNC) {
-                if (cfg.getCacheMode() == REPLICATED ||
-                    (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0))
-                    cache.preloader().syncFuture().get();
+                            utilityCacheStarted = true;
+                        }
+                    }
+                }
             }
         }
 
-        ctx.cacheObjects().onCacheProcessorStarted();
+        assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
+        assert caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
+        assert utilityCacheStarted;
     }
 
     /** {@inheritDoc} */
@@ -812,6 +835,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mgr.stop(cancel);
         }
 
+        CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());
+
         sharedCtx.cleanup();
 
         if (log.isDebugEnabled())
@@ -1047,7 +1072,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
         CacheType cacheType,
-        CacheObjectContext cacheObjCtx)
+        CacheObjectContext cacheObjCtx,
+        boolean updatesAllowed)
         throws IgniteCheckedException
     {
         assert cfg != null;
@@ -1105,6 +1131,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg,
             cacheType,
             ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+            updatesAllowed,
 
             /*
              * Managers in starting order!
@@ -1234,6 +1261,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cfg,
                 cacheType,
                 ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+                true,
 
                 /*
                  * Managers in starting order!
@@ -1423,7 +1451,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         ClusterNode locNode = ctx.discovery().localNode();
 
-        boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode);
+        boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
         boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
 
         if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
@@ -1437,7 +1465,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx);
+            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
 
             cacheCtx.startTopologyVersion(topVer);
 
@@ -1562,10 +1590,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
+     * @param storeSesLsnrs Store session listeners.
      * @return Shared context.
      */
     @SuppressWarnings("unchecked")
-    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) {
+    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+        Collection<CacheStoreSessionListener> storeSesLsnrs) {
         IgniteTxManager tm = new IgniteTxManager();
         GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
         GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -1580,7 +1610,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mvccMgr,
             depMgr,
             exchMgr,
-            ioMgr
+            ioMgr,
+            storeSesLsnrs
         );
     }
 
@@ -1867,7 +1898,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     // Check if we were asked to start a near cache.
                     if (nearCfg != null) {
-                        if (descCfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+                        if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) {
                             // If we are on a data node and near cache was enabled, return success, else - fail.
                             if (descCfg.getNearConfiguration() != null)
                                 return new GridFinishedFuture<>();
@@ -1914,7 +1945,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 return new GridFinishedFuture<>(new CacheExistsException("Failed to start near cache " +
                     "(a cache with the given name is not started): " + cacheName));
 
-            if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+            if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) {
                 if (ccfg.getNearConfiguration() != null)
                     return new GridFinishedFuture<>();
                 else
@@ -2202,11 +2233,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Checks that remote caches has configuration compatible with the local.
      *
+     * @param locCfg Local configuration.
+     * @param rmtCfg Remote configuration.
      * @param rmtNode Remote node.
+     * @param desc Cache descriptor.
      * @throws IgniteCheckedException If check failed.
      */
-    private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode)
-        throws IgniteCheckedException {
+    private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode,
+        DynamicCacheDescriptor desc) throws IgniteCheckedException {
         ClusterNode locNode = ctx.discovery().localNode();
 
         UUID rmt = rmtNode.id();
@@ -2214,6 +2248,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
         GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
 
+        boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter());
+        boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter());
+
         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
             locAttr.cacheMode(), rmtAttr.cacheMode(), true);
 
@@ -2227,8 +2264,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
                 "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
 
-            if (locCfg.getAtomicityMode() == TRANSACTIONAL ||
-                (rmtCfg.getNodeFilter().apply(rmtNode) && locCfg.getNodeFilter().apply(locNode)))
+            boolean checkStore;
+
+            if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) {
+                checkStore = locAttr.storeFactoryClassName() != null;
+
+                if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null)
+                    desc.updatesAllowed(false);
+            }
+            else
+                checkStore = isLocAff && isRmtAff;
+
+            if (checkStore)
                 CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
                     locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
 
@@ -2547,7 +2594,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
+    public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
         return publicJCache(cacheName, true);
     }
 
@@ -2561,7 +2608,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    @Nullable public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
+    @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
         throws IgniteCheckedException
     {
         if (log.isDebugEnabled())
@@ -2569,7 +2616,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         String masked = maskNull(cacheName);
 
-        IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked);
+        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
 
         DynamicCacheDescriptor desc = registeredCaches.get(masked);
 
@@ -2579,7 +2626,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cache == null)
            cache = startJCache(cacheName, failIfNotStarted);
 
-        return cache;
+        return (IgniteCacheProxy<K, V>)cache;
     }
 
     /**
@@ -2589,7 +2636,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
-    private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+    private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
         String masked = maskNull(cacheName);
 
         DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2619,7 +2666,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         F.first(initiateCacheChanges(F.asList(req))).get();
 
-        IgniteCache cache = jCacheProxies.get(masked);
+        IgniteCacheProxy cache = jCacheProxies.get(masked);
 
         if (cache == null && failIfNotStarted)
             throw new IllegalArgumentException("Cache is not started: " + cacheName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 55d2f84..63ba242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -329,7 +329,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException {
+    @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -341,6 +341,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.getAllOutTxAsync(keys);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isIgfsDataCache() {
         CacheOperationContext prev = gate.enter(opCtx);
 
@@ -741,6 +753,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public Set<K> keySetx() {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.keySetx();
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         CacheOperationContext prev = gate.enter(opCtx);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 294c2b0..1071ef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -76,6 +78,9 @@ public class GridCacheSharedContext<K, V> {
     /** Preloaders start future. */
     private IgniteInternalFuture<Object> preloadersStartFut;
 
+    /** Store session listeners. */
+    private Collection<CacheStoreSessionListener> storeSesLsnrs;
+
     /**
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
@@ -88,7 +93,8 @@ public class GridCacheSharedContext<K, V> {
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
-        GridCacheIoManager ioMgr
+        GridCacheIoManager ioMgr,
+        Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
         this.mvccMgr = add(mvccMgr);
@@ -97,6 +103,7 @@ public class GridCacheSharedContext<K, V> {
         this.depMgr = add(depMgr);
         this.exchMgr = add(exchMgr);
         this.ioMgr = add(ioMgr);
+        this.storeSesLsnrs = storeSesLsnrs;
 
         txMetrics = new TransactionMetricsAdapter();
 
@@ -427,27 +434,38 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to check.
      * @param activeCacheIds Active cache IDs.
      * @param cacheCtx Cache context.
-     * @return {@code True} if cross-cache transaction can include this new cache.
+     * @return Error message if transactions are incompatible.
      */
-    public boolean txCompatible(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) {
-        if (cacheCtx.systemTx() ^ tx.system())
-            return false;
+    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds,
+        GridCacheContext<K, V> cacheCtx) {
+        if (cacheCtx.systemTx() && !tx.system())
+            return "system cache can be enlisted only in system transaction";
+
+        if (!cacheCtx.systemTx() && tx.system())
+            return "non-system cache can't be enlisted in system transaction";
 
         for (Integer cacheId : activeCacheIds) {
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
-            // System transactions may sap only one cache.
             if (cacheCtx.systemTx()) {
                 if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
-                    return false;
+                    return "system transaction can include only one cache";
             }
 
-            // Check that caches have the same store.
-            if (activeCacheCtx.store().store() != cacheCtx.store().store())
-                return false;
+            CacheStoreManager store = cacheCtx.store();
+            CacheStoreManager activeStore = activeCacheCtx.store();
+
+            if (store.isLocal() != activeStore.isLocal())
+                return "caches with local and non-local stores can't be enlisted in one transaction";
+
+            if (store.isWriteBehind() != activeStore.isWriteBehind())
+                return "caches with different write-behind setting can't be enlisted in one transaction";
+
+            // If local and write-behind validations passed, this must be true.
+            assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht();
         }
 
-        return true;
+        return null;
     }
 
     /**
@@ -499,6 +517,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @param tx Transaction to rollback.
      * @throws IgniteCheckedException If failed.
+     * @return Rollback future.
      */
     public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
         Collection<Integer> cacheIds = tx.activeCacheIds();
@@ -512,6 +531,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Store session listeners.
+     */
+    @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
+        return storeSesLsnrs;
+    }
+
+    /**
      * @param mgr Manager to add.
      * @return Added manager.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..772e849 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         warnFirstEvict();
 
                     writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapEvict();
                 }
                 catch (IgniteCheckedException e) {
                     log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
@@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Reconstituted swap entry or {@code null} if entry is obsolete.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException
-    {
+    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException {
         assert e != null;
 
         checkIteratorQueue();
@@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part = cctx.affinity().partition(key);
 
         // First check off-heap store.
-        if (offheapEnabled)
-            if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())))
+        if (offheapEnabled) {
+            boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRead(contains);
+
+            if (contains)
                 return true;
+        }
 
         if (swapEnabled) {
             assert key != null;
@@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
                 cctx.deploy().globalLoader());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapRead(valBytes != null);
+
             return valBytes != null;
         }
 
@@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to read.
-     * @param keyBytes Key bytes.
      * @param part Key partition.
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
@@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (readOffheap && offheapEnabled) {
                 byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
 
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRead(bytes != null);
+
                 if (bytes != null)
                     return swapEntry(unmarshalSwapEntry(bytes));
             }
@@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
 
+            if (cctx.config().isStatisticsEnabled()) {
+                if (entryBytes != null)
+                    cctx.cache().metrics0().onOffHeapRemove();
+
+                cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
+            }
+
             if (entryBytes != null) {
                 GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
 
@@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key,
-        final int part)
+    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part)
         throws IgniteCheckedException {
         if (!swapEnabled)
             return null;
@@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onSwapRead(rmv != null);
+
                 if (rmv != null) {
                     try {
                         GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
@@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                 null);
                         }
 
+                        if (cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onSwapRemove();
+
                         // Always fire this event, since preloading depends on it.
                         onUnswapped(part, key, entry);
 
@@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
-        return read(entry.key(),
-            entry.key().valueBytes(cctx.cacheObjectContext()),
-            entry.partition(),
-            locked,
-            readOffheap,
-            readSwap);
+        return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
+            readOffheap, readSwap);
     }
 
     /**
@@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         final GridCacheQueryManager qryMgr = cctx.queries();
 
         Collection<SwapKey> unprocessedKeys = null;
+
         final Collection<GridCacheBatchSwapEntry> res = new ArrayList<>(keys.size());
 
         // First try removing from offheap.
@@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             for (KeyCacheObject key : keys) {
                 int part = cctx.affinity().partition(key);
 
-                byte[] entryBytes =
-                    offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(entryBytes != null && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
 
                 if (entryBytes != null) {
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
@@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                     null);
                             }
 
+                            if (cctx.config().isStatisticsEnabled())
+                                cctx.cache().metrics0().onSwapRemove();
+
                             // Always fire this event, since preloading depends on it.
                             onUnswapped(swapKey.partition(), key, entry);
 
@@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+        boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+        if(rmv && cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onOffHeapRemove();
+
+        return rmv;
     }
 
     /**
@@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     return;
 
                 try {
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onSwapRemove();
+
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
 
                     if (entry == null)
@@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try offheap.
         if (offheapEnabled) {
-            byte[] val = offheap.remove(spaceName,
-                part,
-                key.value(cctx.cacheObjectContext(), false),
+            byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
                 key.valueBytes(cctx.cacheObjectContext()));
 
+            if(val != null && cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRemove();
+
             if (val != null) {
                 if (c != null)
                     c.apply(val); // Probably we should read value and apply closure before removing...
@@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapWrite();
+
             if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                 cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
                     EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
@@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         if (offheapEnabled) {
             for (GridCacheBatchSwapEntry swapEntry : swapped) {
-                offheap.put(spaceName,
-                    swapEntry.partition(),
-                    swapEntry.key(),
-                    swapEntry.key().valueBytes(cctx.cacheObjectContext()),
-                    swapEntry.marshal());
+                offheap.put(spaceName, swapEntry.partition(), swapEntry.key(),
+                    swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal());
+
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapWrite();
 
                 if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
@@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapWrite(batch.size());
         }
     }
 
@@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entry Entry bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private void writeToSwap(int part,
-        KeyCacheObject key,
-        byte[] entry)
-        throws IgniteCheckedException
-    {
+    private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException {
         checkIteratorQueue();
 
         swapMgr.write(spaceName,
             new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
-            entry,
-            cctx.deploy().globalLoader());
+            entry, cctx.deploy().globalLoader());
+
+        if (cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onSwapWrite();
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED))
             cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null,
@@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
                 else
                     it.removeX();
@@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 return it.hasNext();
             }
 
+            @SuppressWarnings("unchecked")
             @Override protected void onRemove() throws IgniteCheckedException {
                 if (cur == null)
                     throw new IllegalStateException("Method next() has not yet been called, or the remove() method " +
@@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
 
                 @Override protected void onClose() throws IgniteCheckedException {
@@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 int part = cctx.affinity().partition(key);
 
-                offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(rmv && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5f9049a..9bd6321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
+        boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
+            !cctx.config().isEagerTtl() ||
+            CU.isAtomicsCache(cctx.name()) ||
+            CU.isMarshallerCache(cctx.name()) ||
+            CU.isUtilityCache(cctx.name()) ||
+            (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
+
+        if (cleanupDisabled)
             return;
 
         cleanupWorker = new CleanupWorker();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..3bd2a45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.plugin.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
+import javax.cache.configuration.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
 import java.io.*;
@@ -114,13 +117,6 @@ public class GridCacheUtils {
             }
         };
 
-    /** Not evicted partitions. */
-    private static final IgnitePredicate PART_NOT_EVICTED = new P1<GridDhtLocalPartition>() {
-        @Override public boolean apply(GridDhtLocalPartition p) {
-            return p.state() != GridDhtPartitionState.EVICTED;
-        }
-    };
-
     /** */
     private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY =
         new C1<Integer, GridCacheVersion[]>() {
@@ -398,30 +394,11 @@ public class GridCacheUtils {
      * @return Partition to state transformer.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() {
+    public static IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() {
         return PART2STATE;
     }
 
     /**
-     * @return Not evicted partitions.
-     */
-    @SuppressWarnings( {"unchecked"})
-    public static <K, V> IgnitePredicate<GridDhtLocalPartition> notEvicted() {
-        return PART_NOT_EVICTED;
-    }
-
-    /**
-     * Gets all nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @return All nodes on which cache with the same name is started (including nodes
-     *      that may have already left).
-     */
-    public static Collection<ClusterNode> allNodes(GridCacheContext ctx) {
-        return allNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
      * Gets all nodes on which cache with the same name is started.
      *
      * @param ctx Cache context.
@@ -446,59 +423,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets alive nodes.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
-     * Gets remote nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @return Remote nodes on which cache with the same name is started.
-     */
-    public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx) {
-        return remoteNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets remote node with at least one cache configured.
-     *
-     * @param ctx Shared cache context.
-     * @return Collection of nodes with at least one cache configured.
-     */
-    public static Collection<ClusterNode> remoteNodes(GridCacheSharedContext ctx) {
-        return remoteNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets remote nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Remote nodes on which cache with the same name is started.
-     */
-    public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().remoteCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
-     * Gets alive nodes.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveRemoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
      * Gets remote nodes with at least one cache configured.
      *
      * @param ctx Cache shared context.
@@ -510,25 +434,15 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets alive nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveNodesWithCaches(topOrder);
-    }
-
-    /**
      * Gets alive remote nodes with at least one cache configured.
      *
      * @param ctx Cache context.
      * @param topOrder Maximum allowed node order.
      * @return Affinity nodes.
      */
-    public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteNodesWithCaches(topOrder);
+    public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx,
+        AffinityTopologyVersion topOrder) {
+        return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder);
     }
 
     /**
@@ -577,90 +491,34 @@ public class GridCacheUtils {
     }
 
     /**
-     * Checks if given node has specified cache started.
-     *
-     * @param cacheName Cache name.
-     * @param node Node to check.
-     * @return {@code True} if given node has specified cache started.
-     */
-    public static boolean cacheNode(String cacheName, ClusterNode node) {
-        return cacheNode(cacheName, (GridCacheAttributes[])node.attribute(ATTR_CACHE));
-    }
-
-    /**
-     * Checks if given attributes relate the the node which has (or had) specified cache started.
-     *
-     * @param cacheName Cache name.
-     * @param caches Node cache attributes.
-     * @return {@code True} if given node has specified cache started.
-     */
-    public static boolean cacheNode(String cacheName, GridCacheAttributes[] caches) {
-        if (caches != null)
-            for (GridCacheAttributes attrs : caches)
-                if (F.eq(cacheName, attrs.cacheName()))
-                    return true;
-
-        return false;
-    }
-
-    /**
-     * Gets oldest alive node for specified topology version.
-     *
-     * @param cctx Cache context.
-     * @return Oldest node for the current topology version.
-     */
-    public static ClusterNode oldest(GridCacheContext cctx) {
-        return oldest(cctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node across nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @return Oldest node.
-     */
-    public static ClusterNode oldest(GridCacheSharedContext ctx) {
-        return oldest(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node for specified topology version.
+     * Gets oldest alive server node with at least one cache configured for specified topology version.
      *
-     * @param cctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Oldest node for the given topology version.
+     * @param ctx Context.
+     * @param topVer Maximum allowed topology version.
+     * @return Oldest alive cache server node.
      */
-    public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) {
-        ClusterNode oldest = null;
+    @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
+        AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
 
-        for (ClusterNode n : aliveNodes(cctx, topOrder))
-            if (oldest == null || n.order() < oldest.order())
-                oldest = n;
-
-        assert oldest != null : "Failed to find oldest node for cache context [name=" + cctx.name() + ", topOrder=" + topOrder + ']';
-        assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder);
+        if (nodes.isEmpty())
+            return null;
 
-        return oldest;
+        return oldest(nodes);
     }
 
     /**
-     * Gets oldest alive node with at least one cache configured for specified topology version.
-     *
-     * @param cctx Shared cache context.
-     * @param topOrder Maximum allowed node order.
+     * @param nodes Nodes.
      * @return Oldest node for the given topology version.
      */
-    public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) {
+    @Nullable public static ClusterNode oldest(Collection<ClusterNode> nodes) {
         ClusterNode oldest = null;
 
-        for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) {
+        for (ClusterNode n : nodes) {
             if (oldest == null || n.order() < oldest.order())
                 oldest = n;
         }
 
-        assert oldest != null : "Failed to find oldest node with caches: " + topOrder;
-        assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder);
-
         return oldest;
     }
 
@@ -718,30 +576,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Closure that converts tx entry to key.
-     */
-    @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<IgniteTxEntry, K> tx2key() {
-        return (IgniteClosure<IgniteTxEntry, K>)tx2key;
-    }
-
-    /**
-     * @return Closure that converts tx entry collection to key collection.
-     */
-    @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<Collection<IgniteTxEntry>, Collection<K>> txCol2Key() {
-        return (IgniteClosure<Collection<IgniteTxEntry>, Collection<K>>)txCol2key;
-    }
-
-    /**
-     * @return Converts transaction entry to cache entry.
-     */
-    @SuppressWarnings( {"unchecked"})
-    public static <K, V> IgniteClosure<IgniteTxEntry, GridCacheEntryEx> tx2entry() {
-        return (IgniteClosure<IgniteTxEntry, GridCacheEntryEx>)tx2entry;
-    }
-
-    /**
      * @return Closure which converts transaction entry xid to XID version.
      */
     @SuppressWarnings( {"unchecked"})
@@ -1451,13 +1285,7 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Cache ID for utility cache.
-     */
-    public static int utilityCacheId() {
-        return cacheId(UTILITY_CACHE_NAME);
-    }
-
-    /**
+     * @param cacheName Cache name.
      * @return Cache ID.
      */
     public static int cacheId(String cacheName) {
@@ -1688,7 +1516,7 @@ public class GridCacheUtils {
     /**
      * @param aff Affinity.
      * @param n Node.
-     * @return Predicate that evaulates to {@code true} if entry is primary for node.
+     * @return Predicate that evaluates to {@code true} if entry is primary for node.
      */
     public static CacheEntryPredicate cachePrimary(
         final Affinity aff,
@@ -1790,4 +1618,76 @@ public class GridCacheUtils {
 
         return res;
     }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+     */
+    public static boolean clientNode(ClusterNode node) {
+        Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+        assert clientModeAttr != null : node;
+
+        return clientModeAttr != null && clientModeAttr;
+    }
+
+    /**
+     * @param node Node.
+     * @param filter Node filter.
+     * @return {@code True} if node is not client node and pass given filter.
+     */
+    public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+        return !clientNode(node) && filter.apply(node);
+    }
+
+    /**
+     * Creates and starts store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param factories Factories.
+     * @return Listeners.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx,
+        Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
+        if (factories == null)
+            return null;
+
+        Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
+
+        for (Factory<CacheStoreSessionListener> factory : factories) {
+            CacheStoreSessionListener lsnr = factory.create();
+
+            if (lsnr != null) {
+                ctx.resource().injectGeneric(lsnr);
+
+                if (lsnr instanceof LifecycleAware)
+                    ((LifecycleAware)lsnr).start();
+
+                lsnrs.add(lsnr);
+            }
+        }
+
+        return lsnrs;
+    }
+
+    /**
+     * Stops store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param sesLsnrs Session listeners.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs)
+        throws IgniteCheckedException {
+        if (sesLsnrs == null)
+            return;
+
+        for (CacheStoreSessionListener lsnr : sesLsnrs) {
+            if (lsnr instanceof LifecycleAware)
+                ((LifecycleAware)lsnr).stop();
+
+            ctx.resource().cleanupGeneric(lsnr);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f840015..4390993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -699,6 +699,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+        try {
+            CacheOperationContext prev = onEnter(opCtx);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAllOutTxAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAllOutTx(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
     /**
      * @param keys Keys.
      * @return Values map.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 5184115..d98379c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -775,6 +775,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public Set<K> keySet();
 
     /**
+     * @return Set of keys including internal keys.
+     */
+    public Set<K> keySetx();
+
+    /**
      * Set of keys for which this node is primary.
      * This set is dynamic and may change with grid topology changes.
      * Note that this set will contain mappings for all keys, even if their values are
@@ -1618,7 +1623,16 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      * @return Value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException;
+    public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException;
+
+    /**
+     * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries
+     * and will not lock any keys if pessimistic transaction is started by thread.
+     *
+     * @param keys Keys to get values for.
+     * @return Future for getAllOutTx operation.
+     */
+    public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys);
 
     /**
      * Checks whether this cache is IGFS data cache.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 0186a90..0790052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -84,9 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
     @Override public int[] primaryPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
-        Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
+        Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion());
 
         return U.toIntArray(parts);
     }
@@ -95,9 +93,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
     @Override public int[] backupPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
-        Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
+        Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion());
 
         return U.toIntArray(parts);
     }
@@ -108,7 +104,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
 
         Collection<Integer> parts = new HashSet<>();
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+        AffinityTopologyVersion topVer = topologyVersion();
 
         for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
             for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index fa8d192..b5c5161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -218,7 +218,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
                         }
                     },
                     new QueueHeaderPredicate(),
-                    cctx.isLocal() || cctx.isReplicated(),
+                    cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()),
                     true);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index b79f9d5..bd72764 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -327,13 +327,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
-     *
-     */
-    public void onUnlock() {
-        // No-op.
-    }
-
-    /**
      * Unlocks local lock.
      *
      * @return Removed candidate, or <tt>null</tt> if thread still holds the lock.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index fded3c9..bd1dedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable {
     /** {@code True} if mapping is for near caches, {@code false} otherwise. */
     private boolean near;
 
+    /** {@code True} if this is first mapping for optimistic tx on client node. */
+    private boolean clientFirst;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable {
     }
 
     /**
+     * @return {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public boolean clientFirst() {
+        return clientFirst;
+    }
+
+    /**
+     * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public void clientFirst(boolean clientFirst) {
+        this.clientFirst = clientFirst;
+    }
+
+    /**
      * @return {@code True} if mapping is for near caches, {@code false} otherwise.
      */
     public boolean near() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 331de4e..c3f3e7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -210,7 +210,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+
+            assert oldest != null;
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -218,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -665,7 +667,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -715,7 +717,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 303d649..7bae7f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -96,12 +96,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
 
     /**
      * @param node Node.
-     * @param res Reponse.
+     * @param res Response.
      */
     public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) {
         if (!res.topologyVersion().equals(topVer)) {
             if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topolgy version (will ignore) " +
+                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
                     "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']');
 
             return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 10b84e2..adea9e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -54,7 +54,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     private GridDhtPartitionTopology top;
 
     /** Preloader. */
-    protected GridCachePreloader<K, V> preldr;
+    protected GridCachePreloader preldr;
 
     /** Multi tx future holder. */
     private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
@@ -75,7 +75,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
         super(ctx, ctx.config().getStartSize());
 
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
+        top = new GridDhtPartitionTopologyImpl(ctx);
     }
 
     /**
@@ -87,7 +87,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) {
         super(ctx, map);
 
-        top = new GridDhtPartitionTopologyImpl<>(ctx);
+        top = new GridDhtPartitionTopologyImpl(ctx);
     }
 
     /** {@inheritDoc} */
@@ -168,17 +168,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public GridCachePreloader<K, V> preloader() {
+    @Override public GridCachePreloader preloader() {
         return preldr;
     }
 
     /**
      * @return DHT preloader.
      */
-    public GridDhtPreloader<K, V> dhtPreloader() {
+    public GridDhtPreloader dhtPreloader() {
         assert preldr instanceof GridDhtPreloader;
 
-        return (GridDhtPreloader<K, V>)preldr;
+        return (GridDhtPreloader)preldr;
     }
 
     /**
@@ -932,6 +932,21 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * @param expVer Expected topology version.
+     * @param curVer Current topology version.
+     * @return {@code True} if cache affinity changed and operation should be remapped.
+     */
+    protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) {
+        if (expVer.equals(curVer))
+            return false;
+
+        Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
+        Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
+
+        return !cacheNodes0.equals(cacheNodes1);
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @return Local entries iterator.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index c9a7af8..89b85c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -292,12 +292,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
         return ret;
     }
 
-    /**
-     * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
-     */
+    /** {@inheritDoc} */
     @Override public void onUnlock() {
-        super.onUnlock();
-
         locPart.onUnlock();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index f6f930e..742fbfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -295,6 +295,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                     if (info == null)
                         continue;
 
+                    boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
+
+                    if (addReader)
+                        e.unswap(false);
+
                     // Register reader. If there are active transactions for this entry,
                     // then will wait for their completion before proceeding.
                     // TODO: GG-4003:
@@ -303,8 +308,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
                     // TODO: To fix, check that reader is contained in the list of readers once
                     // TODO: again after the returned future completes - if not, try again.
                     // TODO: Also, why is info read before transactions are complete, and not after?
-                    IgniteInternalFuture<Boolean> f = (!e.deleted() && k.getValue() && !skipVals) ?
-                        e.addReader(reader, msgId, topVer) : null;
+                    IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
 
                     if (f != null) {
                         if (txFut == null)
@@ -317,6 +321,9 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
 
                     break;
                 }
+                catch (IgniteCheckedException err) {
+                    return new GridFinishedFuture<>(err);
+                }
                 catch (GridCacheEntryRemovedException ignore) {
                     if (log.isDebugEnabled())
                         log.debug("Got removed entry when getting a DHT value: " + e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index c57eded..bdaa552 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -47,7 +47,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.*;
 /**
  * Cache lock future.
  */
-public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
     /** */
     private static final long serialVersionUID = 0L;
@@ -60,7 +60,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
 
     /** Cache registry. */
     @GridToStringExclude
-    private GridCacheContext<K, V> cctx;
+    private GridCacheContext<?, ?> cctx;
 
     /** Near node ID. */
     private UUID nearNodeId;
@@ -151,7 +151,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param skipStore Skip store flag.
      */
     public GridDhtLockFuture(
-        GridCacheContext<K, V> cctx,
+        GridCacheContext<?, ?> cctx,
         UUID nearNodeId,
         GridCacheVersion nearLockVer,
         @NotNull AffinityTopologyVersion topVer,
@@ -221,7 +221,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
      * @param cacheCtx Cache context.
      * @param invalidPart Partition to retry.
      */
-    void addInvalidPartition(GridCacheContext<K, V> cacheCtx, int invalidPart) {
+    void addInvalidPartition(GridCacheContext<?, ?> cacheCtx, int invalidPart) {
         invalidParts.add(invalidPart);
 
         // Register invalid partitions with transaction.
@@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
          * @param entries Entries to check.
          */
         @SuppressWarnings({"ForLoopReplaceableByForEach"})
-        private void evictReaders(GridCacheContext<K, V> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
+        private void evictReaders(GridCacheContext<?, ?> cacheCtx, Collection<IgniteTxKey> keys, UUID nodeId, long msgId,
             @Nullable List<GridDhtCacheEntry> entries) {
             if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty())
                 return;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 073e0e7..374ab87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  * Partition topology.
  */
 @GridToStringExclude
-class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
+class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -49,7 +49,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     private static final boolean FULL_MAP_DEBUG = false;
 
     /** Context. */
-    private final GridCacheContext<K, V> cctx;
+    private final GridCacheContext<?, ?> cctx;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -85,7 +85,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
     /**
      * @param cctx Context.
      */
-    GridDhtPartitionTopologyImpl(GridCacheContext<K, V> cctx) {
+    GridDhtPartitionTopologyImpl(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
 
         this.cctx = cctx;
@@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx.shared(), topVer);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+            assert oldest != null;
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -247,7 +249,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -274,7 +276,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
             if (cctx.rebalanceEnabled()) {
                 for (int p = 0; p < num; p++) {
                     // If this is the first node in grid.
-                    boolean added = exchFut.isCacheAdded(cctx.cacheId());
+                    boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion());
 
                     if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) {
                         assert exchId.isJoined() || added;
@@ -604,7 +606,7 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         try {
             return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(),
-                F.viewReadOnly(locParts, CU.<K, V>part2state()), true);
+                F.viewReadOnly(locParts, CU.part2state()), true);
         }
         finally {
             lock.readLock().unlock();
@@ -660,13 +662,15 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
      * @return List of nodes for the partition.
      */
     private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
-        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null;
+        Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
 
         lock.readLock().lock();
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
-                ", allIds=" + allIds + ", node2part=" + node2part + ']';
+                ", allIds=" + allIds +
+                ", node2part=" + node2part +
+                ", cache=" + cctx.name() + ']';
 
             Collection<UUID> nodeIds = part2node.get(p);
 
@@ -738,7 +742,11 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
-                ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']';
+                ", cache=" + cctx.name() +
+                ", started=" + cctx.started() +
+                ", stopping=" + stopping +
+                ", locNodeId=" + cctx.localNode().id() +
+                ", locName=" + cctx.gridName() + ']';
 
             GridDhtPartitionFullMap m = node2part;
 
@@ -756,6 +764,8 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         if (log.isDebugEnabled())
             log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
 
+        assert partMap != null;
+
         lock.writeLock().lock();
 
         try {
@@ -1024,7 +1034,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.nodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+        assert oldest != null;
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.nodeId())) {
@@ -1074,7 +1086,9 @@ class GridDhtPartitionTopologyImpl<K, V> implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+        assert oldest != null;
 
         ClusterNode loc = cctx.localNode();
 


Mime
View raw message