ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [26/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0ecaf97..3236bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -97,7 +97,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private final AtomicReference<AffinityTopologyVersion> readyTopVer =
         new AtomicReference<>(AffinityTopologyVersion.NONE);
 
-
     /**
      * Partition map futures.
      * This set also contains already completed exchange futures to address race conditions when coordinator
@@ -150,8 +149,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 else {
                     DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
 
-                    if (customEvt.data() instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data();
+                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
 
                         Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
 
@@ -554,7 +553,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * Partition refresh callback.
      */
     void refreshPartitions() {
-        ClusterNode oldest = CU.oldest(cctx);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+        if (oldest == null) {
+            if (log.isDebugEnabled())
+                log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']');
+
+            return;
+        }
 
         if (log.isDebugEnabled())
             log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
@@ -564,7 +570,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         try {
             // If this is the oldest node.
             if (oldest.id().equals(cctx.localNodeId())) {
-                rmts = CU.remoteNodes(cctx);
+                rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
 
                 if (log.isDebugEnabled())
                     log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
@@ -641,7 +647,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
         throws IgniteCheckedException {
-        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+        GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+            cctx.kernalContext().clientNode(),
+            cctx.versions().last());
 
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (!cacheCtx.isLocal()) {
@@ -687,6 +695,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param exchId Exchange ID.
      * @param discoEvt Discovery event.
+     * @param reqs Cache change requests.
      * @return Exchange future.
      */
     GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@@ -696,9 +705,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         GridDhtPartitionsExchangeFuture old = exchFuts.addx(
             fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
 
-        if (old != null)
+        if (old != null) {
             fut = old;
 
+            if (reqs != null)
+                fut.cacheChangeRequests(reqs);
+        }
+
         if (discoEvt != null)
             fut.onEvent(exchId, discoEvt);
 
@@ -827,7 +840,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @param node Node ID.
      * @param msg Message.
      */
-    private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+    private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
         if (!enterBusy())
             return;
 
@@ -858,8 +871,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (updated)
                     scheduleResendPartitions();
             }
-            else
-                exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+            else {
+                if (msg.client()) {
+                    final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+                        null,
+                        null);
+
+                    exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                            // Finished future should reply only to sender client node.
+                            exchFut.onReceive(node.id(), msg);
+                        }
+                    });
+                }
+                else
+                    exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+            }
         }
         finally {
             leaveBusy();
@@ -982,7 +1009,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     busy = true;
 
-                    Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>();
+                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
 
                     boolean dummyReassign = exchFut.dummyReassign();
                     boolean forcePreload = exchFut.forcePreload();
@@ -1017,7 +1044,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 changed |= cacheCtx.topology().afterExchange(exchFut);
 
                                 // Preload event notification.
-                                if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
+                                if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
                                     if (!cacheCtx.isReplicated() || !startEvtFired) {
                                         DiscoveryEvent discoEvt = exchFut.discoveryEvent();
 
@@ -1043,16 +1070,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                            long delay = cacheCtx.config().getRebalanceDelay();
+                        if (!exchFut.skipPreload()) {
+                            assignsMap = new HashMap<>();
 
-                            GridDhtPreloaderAssignments<K, V> assigns = null;
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                long delay = cacheCtx.config().getRebalanceDelay();
+
+                                GridDhtPreloaderAssignments assigns = null;
 
-                            // Don't delay for dummy reassigns to avoid infinite recursion.
-                            if (delay == 0 || forcePreload)
-                                assigns = cacheCtx.preloader().assign(exchFut);
+                                // Don't delay for dummy reassigns to avoid infinite recursion.
+                                if (delay == 0 || forcePreload)
+                                    assigns = cacheCtx.preloader().assign(exchFut);
 
-                            assignsMap.put(cacheCtx.cacheId(), assigns);
+                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                            }
                         }
                     }
                     finally {
@@ -1061,7 +1092,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     }
 
                     if (assignsMap != null) {
-                        for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) {
+                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
                             int cacheId = e.getKey();
 
                             GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
@@ -1113,20 +1144,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         /** {@inheritDoc} */
         @Override public void onTimeout() {
-            if (!busyLock.readLock().tryLock())
-                return;
+            cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                @Override public void run() {
+                    if (!busyLock.readLock().tryLock())
+                        return;
 
-            try {
-                if (started.compareAndSet(false, true))
-                    refreshPartitions();
-            }
-            finally {
-                busyLock.readLock().unlock();
+                    try {
+                        if (started.compareAndSet(false, true))
+                            refreshPartitions();
+                    }
+                    finally {
+                        busyLock.readLock().unlock();
 
-                cctx.time().removeTimeoutObject(this);
+                        cctx.time().removeTimeoutObject(ResendTimeoutObject.this);
 
-                pendingResend.compareAndSet(this, null);
-            }
+                        pendingResend.compareAndSet(ResendTimeoutObject.this, null);
+                    }
+                }
+            });
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 2e181f9..e0f6181 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -30,7 +30,7 @@ import java.util.*;
  * Cache preloader that is responsible for loading cache entries either from remote
  * nodes (for distributed cache) or anywhere else at cache startup.
  */
-public interface GridCachePreloader<K, V> {
+public interface GridCachePreloader {
     /**
      * Starts preloading.
      *
@@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> {
      * @param exchFut Exchange future to assign.
      * @return Assignments.
      */
-    public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut);
+    public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
@@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> {
      * @param assignments Assignments to add.
      * @param forcePreload Force preload flag.
      */
-    public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload);
+    public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 80d3d6b..b4f386f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -31,9 +31,9 @@ import java.util.*;
 /**
  * Adapter for preloading which always assumes that preloading finished.
  */
-public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> {
+public class GridCachePreloaderAdapter implements GridCachePreloader {
     /** Cache context. */
-    protected final GridCacheContext<K, V> cctx;
+    protected final GridCacheContext<?, ?> cctx;
 
     /** Logger.*/
     protected final IgniteLogger log;
@@ -50,7 +50,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
     /**
      * @param cctx Cache context.
      */
-    public GridCachePreloaderAdapter(GridCacheContext<K, V> cctx) {
+    public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
         assert cctx != null;
 
         this.cctx = cctx;
@@ -126,17 +126,18 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
         // No-op.
     }
 
+    /** {@inheritDoc} */
     @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+    @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
         // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7c2dfe9..33b25c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
@@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.plugin.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -153,7 +153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg.setMemoryMode(DFLT_MEMORY_MODE);
 
         if (cfg.getNodeFilter() == null)
-            cfg.setNodeFilter(CacheConfiguration.SERVER_NODES);
+            cfg.setNodeFilter(CacheConfiguration.ALL_NODES);
 
         if (cfg.getAffinity() == null) {
             if (cfg.getCacheMode() == PARTITIONED) {
@@ -265,7 +265,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         // Suppress warning if at least one ATOMIC cache found.
         perf.add("Enable ATOMIC mode if not using transactions (set 'atomicityMode' to ATOMIC)",
-            cfg.getAtomicityMode() == ATOMIC);
+                 cfg.getAtomicityMode() == ATOMIC);
 
         // Suppress warning if at least one non-FULL_SYNC mode found.
         perf.add("Disable fully synchronous writes (set 'writeSynchronizationMode' to PRIMARY_SYNC or FULL_ASYNC)",
@@ -425,7 +425,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         if (cc.getAtomicityMode() == ATOMIC)
             assertParameter(cc.getTransactionManagerLookupClassName() == null,
-                "transaction manager can not be used with ATOMIC cache");
+                            "transaction manager can not be used with ATOMIC cache");
     }
 
     /**
@@ -541,10 +541,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration());
 
-        ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() {
-            @Override public void apply(Serializable evt) {
-                if (evt instanceof DynamicCacheChangeBatch)
-                    onCacheChangeRequested((DynamicCacheChangeBatch)evt);
+        ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
+            new CustomEventListener<DynamicCacheChangeBatch>() {
+            @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) {
+                onCacheChangeRequested(msg);
             }
         });
 
@@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
 
-        sharedCtx = createSharedContext(ctx);
+        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
+            ctx.config().getCacheStoreSessionListenerFactories()));
 
         ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)",
             !ctx.config().getTransactionConfiguration().isTxSerializableEnabled());
@@ -622,9 +623,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 ctx.discovery().setCacheFilter(
                     cfg.getName(),
                     cfg.getNodeFilter(),
-                    cfg.getNearConfiguration() != null,
+                    cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
                     cfg.getCacheMode() == LOCAL);
 
+                ctx.discovery().addClientNode(cfg.getName(),
+                    ctx.localNodeId(),
+                    cfg.getNearConfiguration() != null);
+
                 if (!cacheType.userCache())
                     stopSeq.addLast(cfg.getName());
                 else
@@ -669,6 +674,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
                 for (ClusterNode n : ctx.discovery().remoteNodes()) {
+                    if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
+                        continue;
+
                     checkTransactionConfiguration(n);
 
                     DeploymentMode locDepMode = ctx.config().getDeploymentMode();
@@ -683,7 +691,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         if (rmtCfg != null) {
                             CacheConfiguration locCfg = desc.cacheConfiguration();
 
-                            checkCache(locCfg, rmtCfg, n);
+                            checkCache(locCfg, rmtCfg, n, desc);
 
                             // Check plugin cache configurations.
                             CachePluginManager pluginMgr = desc.pluginManager();
@@ -706,12 +714,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 IgnitePredicate filter = ccfg.getNodeFilter();
 
-                if (filter.apply(locNode)) {
+                boolean loc = desc.locallyConfigured();
+
+                if (loc || CU.affinityNode(locNode, filter)) {
                     CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
                     CachePluginManager pluginMgr = desc.pluginManager();
 
-                    GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx);
+                    GridCacheContext ctx = createCache(
+                        ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed());
 
                     ctx.dynamicDeploymentId(desc.deploymentId());
 
@@ -754,8 +765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 int order = cfg.getRebalanceOrder();
 
                 if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) {
-                    GridCompoundFuture<Object, Object> fut = (GridCompoundFuture<Object, Object>)preloadFuts
-                        .get(order);
+                    GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order);
 
                     if (fut == null) {
                         fut = new GridCompoundFuture<>();
@@ -776,20 +786,23 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         // Wait for caches in SYNC preload mode.
         for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
-            GridCacheAdapter<?, ?> cache = caches.get(maskNull(cfg.getName()));
+            GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));
 
-            if (cache == null)
-                continue;
+            if (cache != null) {
+                if (cfg.getRebalanceMode() == SYNC) {
+                    if (cfg.getCacheMode() == REPLICATED ||
+                        (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) {
+                        cache.preloader().syncFuture().get();
 
-            if (cfg.getRebalanceMode() == SYNC) {
-                if (cfg.getCacheMode() == REPLICATED ||
-                    (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0))
-                    cache.preloader().syncFuture().get();
+                        if (CU.isUtilityCache(cache.name()))
+                            ctx.cacheObjects().onUtilityCacheStarted();
+                    }
+                }
             }
-
-            if (CU.isUtilityCache(cache.name()))
-                ctx.cacheObjects().onUtilityCacheStarted();
         }
+
+        assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
+        assert caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
     }
 
     /** {@inheritDoc} */
@@ -816,6 +829,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mgr.stop(cancel);
         }
 
+        CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners());
+
         sharedCtx.cleanup();
 
         if (log.isDebugEnabled())
@@ -1051,7 +1066,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg,
         @Nullable CachePluginManager pluginMgr,
         CacheType cacheType,
-        CacheObjectContext cacheObjCtx)
+        CacheObjectContext cacheObjCtx,
+        boolean updatesAllowed)
         throws IgniteCheckedException
     {
         assert cfg != null;
@@ -1109,6 +1125,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg,
             cacheType,
             ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+            updatesAllowed,
 
             /*
              * Managers in starting order!
@@ -1238,6 +1255,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cfg,
                 cacheType,
                 ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+                true,
 
                 /*
                  * Managers in starting order!
@@ -1427,7 +1445,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         ClusterNode locNode = ctx.discovery().localNode();
 
-        boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode);
+        boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter);
         boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
 
         if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
@@ -1441,7 +1459,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
 
-            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx);
+            GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true);
 
             cacheCtx.startTopologyVersion(topVer);
 
@@ -1566,10 +1584,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Creates shared context.
      *
      * @param kernalCtx Kernal context.
+     * @param storeSesLsnrs Store session listeners.
      * @return Shared context.
      */
     @SuppressWarnings("unchecked")
-    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) {
+    private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx,
+        Collection<CacheStoreSessionListener> storeSesLsnrs) {
         IgniteTxManager tm = new IgniteTxManager();
         GridCacheMvccManager mvccMgr = new GridCacheMvccManager();
         GridCacheVersionManager verMgr = new GridCacheVersionManager();
@@ -1584,7 +1604,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             mvccMgr,
             depMgr,
             exchMgr,
-            ioMgr
+            ioMgr,
+            storeSesLsnrs
         );
     }
 
@@ -1871,7 +1892,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                     // Check if we were asked to start a near cache.
                     if (nearCfg != null) {
-                        if (descCfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+                        if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) {
                             // If we are on a data node and near cache was enabled, return success, else - fail.
                             if (descCfg.getNearConfiguration() != null)
                                 return new GridFinishedFuture<>();
@@ -1918,7 +1939,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 return new GridFinishedFuture<>(new CacheExistsException("Failed to start near cache " +
                     "(a cache with the given name is not started): " + cacheName));
 
-            if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) {
+            if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) {
                 if (ccfg.getNearConfiguration() != null)
                     return new GridFinishedFuture<>();
                 else
@@ -2206,11 +2227,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * Checks that remote caches has configuration compatible with the local.
      *
+     * @param locCfg Local configuration.
+     * @param rmtCfg Remote configuration.
      * @param rmtNode Remote node.
+     * @param desc Cache descriptor.
      * @throws IgniteCheckedException If check failed.
      */
-    private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode)
-        throws IgniteCheckedException {
+    private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode,
+        DynamicCacheDescriptor desc) throws IgniteCheckedException {
         ClusterNode locNode = ctx.discovery().localNode();
 
         UUID rmt = rmtNode.id();
@@ -2218,6 +2242,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg);
         GridCacheAttributes locAttr = new GridCacheAttributes(locCfg);
 
+        boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter());
+        boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter());
+
         CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode",
             locAttr.cacheMode(), rmtAttr.cacheMode(), true);
 
@@ -2231,8 +2258,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode",
                 "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true);
 
-            if (locCfg.getAtomicityMode() == TRANSACTIONAL ||
-                (rmtCfg.getNodeFilter().apply(rmtNode) && locCfg.getNodeFilter().apply(locNode)))
+            boolean checkStore;
+
+            if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) {
+                checkStore = locAttr.storeFactoryClassName() != null;
+
+                if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null)
+                    desc.updatesAllowed(false);
+            }
+            else
+                checkStore = isLocAff && isRmtAff;
+
+            if (checkStore)
                 CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory",
                     locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true);
 
@@ -2551,7 +2588,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
+    public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
         return publicJCache(cacheName, true);
     }
 
@@ -2565,7 +2602,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    @Nullable public <K, V> IgniteCache<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
+    @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
         throws IgniteCheckedException
     {
         if (log.isDebugEnabled())
@@ -2573,7 +2610,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         String masked = maskNull(cacheName);
 
-        IgniteCache<K,V> cache = (IgniteCache<K, V>)jCacheProxies.get(masked);
+        IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
 
         DynamicCacheDescriptor desc = registeredCaches.get(masked);
 
@@ -2583,7 +2620,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cache == null)
            cache = startJCache(cacheName, failIfNotStarted);
 
-        return cache;
+        return (IgniteCacheProxy<K, V>)cache;
     }
 
     /**
@@ -2593,7 +2630,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
-    private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
+    private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException {
         String masked = maskNull(cacheName);
 
         DynamicCacheDescriptor desc = registeredCaches.get(masked);
@@ -2623,7 +2660,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         F.first(initiateCacheChanges(F.asList(req))).get();
 
-        IgniteCache cache = jCacheProxies.get(masked);
+        IgniteCacheProxy cache = jCacheProxies.get(masked);
 
         if (cache == null && failIfNotStarted)
             throw new IllegalArgumentException("Cache is not started: " + cacheName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 55d2f84..63ba242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -329,7 +329,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException {
+    @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
@@ -341,6 +341,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.getAllOutTxAsync(keys);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean isIgfsDataCache() {
         CacheOperationContext prev = gate.enter(opCtx);
 
@@ -741,6 +753,18 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
     }
 
     /** {@inheritDoc} */
+    @Override public Set<K> keySetx() {
+        CacheOperationContext prev = gate.enter(opCtx);
+
+        try {
+            return delegate.keySetx();
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
         CacheOperationContext prev = gate.enter(opCtx);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 294c2b0..1071ef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.timeout.*;
@@ -76,6 +78,9 @@ public class GridCacheSharedContext<K, V> {
     /** Preloaders start future. */
     private IgniteInternalFuture<Object> preloadersStartFut;
 
+    /** Store session listeners. */
+    private Collection<CacheStoreSessionListener> storeSesLsnrs;
+
     /**
      * @param txMgr Transaction manager.
      * @param verMgr Version manager.
@@ -88,7 +93,8 @@ public class GridCacheSharedContext<K, V> {
         GridCacheMvccManager mvccMgr,
         GridCacheDeploymentManager<K, V> depMgr,
         GridCachePartitionExchangeManager<K, V> exchMgr,
-        GridCacheIoManager ioMgr
+        GridCacheIoManager ioMgr,
+        Collection<CacheStoreSessionListener> storeSesLsnrs
     ) {
         this.kernalCtx = kernalCtx;
         this.mvccMgr = add(mvccMgr);
@@ -97,6 +103,7 @@ public class GridCacheSharedContext<K, V> {
         this.depMgr = add(depMgr);
         this.exchMgr = add(exchMgr);
         this.ioMgr = add(ioMgr);
+        this.storeSesLsnrs = storeSesLsnrs;
 
         txMetrics = new TransactionMetricsAdapter();
 
@@ -427,27 +434,38 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to check.
      * @param activeCacheIds Active cache IDs.
      * @param cacheCtx Cache context.
-     * @return {@code True} if cross-cache transaction can include this new cache.
+     * @return Error message if transactions are incompatible.
      */
-    public boolean txCompatible(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) {
-        if (cacheCtx.systemTx() ^ tx.system())
-            return false;
+    @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds,
+        GridCacheContext<K, V> cacheCtx) {
+        if (cacheCtx.systemTx() && !tx.system())
+            return "system cache can be enlisted only in system transaction";
+
+        if (!cacheCtx.systemTx() && tx.system())
+            return "non-system cache can't be enlisted in system transaction";
 
         for (Integer cacheId : activeCacheIds) {
             GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
 
-            // System transactions may sap only one cache.
             if (cacheCtx.systemTx()) {
                 if (activeCacheCtx.cacheId() != cacheCtx.cacheId())
-                    return false;
+                    return "system transaction can include only one cache";
             }
 
-            // Check that caches have the same store.
-            if (activeCacheCtx.store().store() != cacheCtx.store().store())
-                return false;
+            CacheStoreManager store = cacheCtx.store();
+            CacheStoreManager activeStore = activeCacheCtx.store();
+
+            if (store.isLocal() != activeStore.isLocal())
+                return "caches with local and non-local stores can't be enlisted in one transaction";
+
+            if (store.isWriteBehind() != activeStore.isWriteBehind())
+                return "caches with different write-behind setting can't be enlisted in one transaction";
+
+            // If local and write-behind validations passed, this must be true.
+            assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht();
         }
 
-        return true;
+        return null;
     }
 
     /**
@@ -499,6 +517,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @param tx Transaction to rollback.
      * @throws IgniteCheckedException If failed.
+     * @return Rollback future.
      */
     public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
         Collection<Integer> cacheIds = tx.activeCacheIds();
@@ -512,6 +531,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Store session listeners.
+     */
+    @Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
+        return storeSesLsnrs;
+    }
+
+    /**
      * @param mgr Manager to add.
      * @return Added manager.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index eb82218..772e849 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         warnFirstEvict();
 
                     writeToSwap(part, cctx.toCacheKeyObject(kb), vb);
+
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapEvict();
                 }
                 catch (IgniteCheckedException e) {
                     log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e);
@@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Reconstituted swap entry or {@code null} if entry is obsolete.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException
-    {
+    @Nullable private <X extends GridCacheSwapEntry> X swapEntry(X e) throws IgniteCheckedException {
         assert e != null;
 
         checkIteratorQueue();
@@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part = cctx.affinity().partition(key);
 
         // First check off-heap store.
-        if (offheapEnabled)
-            if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())))
+        if (offheapEnabled) {
+            boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRead(contains);
+
+            if (contains)
                 return true;
+        }
 
         if (swapEnabled) {
             assert key != null;
@@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
                 cctx.deploy().globalLoader());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapRead(valBytes != null);
+
             return valBytes != null;
         }
 
@@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param key Key to read.
-     * @param keyBytes Key bytes.
      * @param part Key partition.
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
@@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (readOffheap && offheapEnabled) {
                 byte[] bytes = offheap.get(spaceName, part, key, keyBytes);
 
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRead(bytes != null);
+
                 if (bytes != null)
                     return swapEntry(unmarshalSwapEntry(bytes));
             }
@@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
 
+            if (cctx.config().isStatisticsEnabled()) {
+                if (entryBytes != null)
+                    cctx.cache().metrics0().onOffHeapRemove();
+
+                cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
+            }
+
             if (entryBytes != null) {
                 GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
 
@@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key,
-        final int part)
+    @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part)
         throws IgniteCheckedException {
         if (!swapEnabled)
             return null;
@@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onSwapRead(rmv != null);
+
                 if (rmv != null) {
                     try {
                         GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
@@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                 null);
                         }
 
+                        if (cctx.config().isStatisticsEnabled())
+                            cctx.cache().metrics0().onSwapRemove();
+
                         // Always fire this event, since preloading depends on it.
                         onUnswapped(part, key, entry);
 
@@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
-        return read(entry.key(),
-            entry.key().valueBytes(cctx.cacheObjectContext()),
-            entry.partition(),
-            locked,
-            readOffheap,
-            readSwap);
+        return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
+            readOffheap, readSwap);
     }
 
     /**
@@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         final GridCacheQueryManager qryMgr = cctx.queries();
 
         Collection<SwapKey> unprocessedKeys = null;
+
         final Collection<GridCacheBatchSwapEntry> res = new ArrayList<>(keys.size());
 
         // First try removing from offheap.
@@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             for (KeyCacheObject key : keys) {
                 int part = cctx.affinity().partition(key);
 
-                byte[] entryBytes =
-                    offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(entryBytes != null && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
 
                 if (entryBytes != null) {
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
@@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                                     null);
                             }
 
+                            if (cctx.config().isStatisticsEnabled())
+                                cctx.cache().metrics0().onSwapRemove();
+
                             // Always fire this event, since preloading depends on it.
                             onUnswapped(swapKey.partition(), key, entry);
 
@@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+        boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+        if(rmv && cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onOffHeapRemove();
+
+        return rmv;
     }
 
     /**
@@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     return;
 
                 try {
+                    if (cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onSwapRemove();
+
                     GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
 
                     if (entry == null)
@@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         // First try offheap.
         if (offheapEnabled) {
-            byte[] val = offheap.remove(spaceName,
-                part,
-                key.value(cctx.cacheObjectContext(), false),
+            byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false),
                 key.valueBytes(cctx.cacheObjectContext()));
 
+            if(val != null && cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapRemove();
+
             if (val != null) {
                 if (c != null)
                     c.apply(val); // Probably we should read value and apply closure before removing...
@@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal());
 
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onOffHeapWrite();
+
             if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                 cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null,
                     EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
@@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         if (offheapEnabled) {
             for (GridCacheBatchSwapEntry swapEntry : swapped) {
-                offheap.put(spaceName,
-                    swapEntry.partition(),
-                    swapEntry.key(),
-                    swapEntry.key().valueBytes(cctx.cacheObjectContext()),
-                    swapEntry.marshal());
+                offheap.put(spaceName, swapEntry.partition(), swapEntry.key(),
+                    swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal());
+
+                if (cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapWrite();
 
                 if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP))
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
@@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }
+
+            if (cctx.config().isStatisticsEnabled())
+                cctx.cache().metrics0().onSwapWrite(batch.size());
         }
     }
 
@@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entry Entry bytes.
      * @throws IgniteCheckedException If failed.
      */
-    private void writeToSwap(int part,
-        KeyCacheObject key,
-        byte[] entry)
-        throws IgniteCheckedException
-    {
+    private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException {
         checkIteratorQueue();
 
         swapMgr.write(spaceName,
             new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())),
-            entry,
-            cctx.deploy().globalLoader());
+            entry, cctx.deploy().globalLoader());
+
+        if (cctx.config().isStatisticsEnabled())
+            cctx.cache().metrics0().onSwapWrite();
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED))
             cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null,
@@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
                 else
                     it.removeX();
@@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 return it.hasNext();
             }
 
+            @SuppressWarnings("unchecked")
             @Override protected void onRemove() throws IgniteCheckedException {
                 if (cur == null)
                     throw new IllegalStateException("Method next() has not yet been called, or the remove() method " +
@@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     int part = cctx.affinity().partition(key);
 
-                    offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                    boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                    if(rmv && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onOffHeapRemove();
                 }
 
                 @Override protected void onClose() throws IgniteCheckedException {
@@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 int part = cctx.affinity().partition(key);
 
-                offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+                boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()));
+
+                if(rmv && cctx.config().isStatisticsEnabled())
+                    cctx.cache().metrics0().onOffHeapRemove();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5f9049a..9bd6321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
+        boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
+            !cctx.config().isEagerTtl() ||
+            CU.isAtomicsCache(cctx.name()) ||
+            CU.isMarshallerCache(cctx.name()) ||
+            CU.isUtilityCache(cctx.name()) ||
+            (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
+
+        if (cleanupDisabled)
             return;
 
         cleanupWorker = new CleanupWorker();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 549f42f..3bd2a45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
@@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.plugin.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
+import javax.cache.configuration.*;
 import javax.cache.expiry.*;
 import javax.cache.integration.*;
 import java.io.*;
@@ -114,13 +117,6 @@ public class GridCacheUtils {
             }
         };
 
-    /** Not evicted partitions. */
-    private static final IgnitePredicate PART_NOT_EVICTED = new P1<GridDhtLocalPartition>() {
-        @Override public boolean apply(GridDhtLocalPartition p) {
-            return p.state() != GridDhtPartitionState.EVICTED;
-        }
-    };
-
     /** */
     private static final IgniteClosure<Integer, GridCacheVersion[]> VER_ARR_FACTORY =
         new C1<Integer, GridCacheVersion[]>() {
@@ -398,30 +394,11 @@ public class GridCacheUtils {
      * @return Partition to state transformer.
      */
     @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() {
+    public static IgniteClosure<GridDhtLocalPartition, GridDhtPartitionState> part2state() {
         return PART2STATE;
     }
 
     /**
-     * @return Not evicted partitions.
-     */
-    @SuppressWarnings( {"unchecked"})
-    public static <K, V> IgnitePredicate<GridDhtLocalPartition> notEvicted() {
-        return PART_NOT_EVICTED;
-    }
-
-    /**
-     * Gets all nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @return All nodes on which cache with the same name is started (including nodes
-     *      that may have already left).
-     */
-    public static Collection<ClusterNode> allNodes(GridCacheContext ctx) {
-        return allNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
      * Gets all nodes on which cache with the same name is started.
      *
      * @param ctx Cache context.
@@ -446,59 +423,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets alive nodes.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
-     * Gets remote nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @return Remote nodes on which cache with the same name is started.
-     */
-    public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx) {
-        return remoteNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets remote node with at least one cache configured.
-     *
-     * @param ctx Shared cache context.
-     * @return Collection of nodes with at least one cache configured.
-     */
-    public static Collection<ClusterNode> remoteNodes(GridCacheSharedContext ctx) {
-        return remoteNodes(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets remote nodes on which cache with the same name is started.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Remote nodes on which cache with the same name is started.
-     */
-    public static Collection<ClusterNode> remoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().remoteCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
-     * Gets alive nodes.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveRemoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteCacheNodes(ctx.namex(), topOrder);
-    }
-
-    /**
      * Gets remote nodes with at least one cache configured.
      *
      * @param ctx Cache shared context.
@@ -510,25 +434,15 @@ public class GridCacheUtils {
     }
 
     /**
-     * Gets alive nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Affinity nodes.
-     */
-    public static Collection<ClusterNode> aliveCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveNodesWithCaches(topOrder);
-    }
-
-    /**
      * Gets alive remote nodes with at least one cache configured.
      *
      * @param ctx Cache context.
      * @param topOrder Maximum allowed node order.
      * @return Affinity nodes.
      */
-    public static Collection<ClusterNode> aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) {
-        return ctx.discovery().aliveRemoteNodesWithCaches(topOrder);
+    public static Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx,
+        AffinityTopologyVersion topOrder) {
+        return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder);
     }
 
     /**
@@ -577,90 +491,34 @@ public class GridCacheUtils {
     }
 
     /**
-     * Checks if given node has specified cache started.
-     *
-     * @param cacheName Cache name.
-     * @param node Node to check.
-     * @return {@code True} if given node has specified cache started.
-     */
-    public static boolean cacheNode(String cacheName, ClusterNode node) {
-        return cacheNode(cacheName, (GridCacheAttributes[])node.attribute(ATTR_CACHE));
-    }
-
-    /**
-     * Checks if given attributes relate the the node which has (or had) specified cache started.
-     *
-     * @param cacheName Cache name.
-     * @param caches Node cache attributes.
-     * @return {@code True} if given node has specified cache started.
-     */
-    public static boolean cacheNode(String cacheName, GridCacheAttributes[] caches) {
-        if (caches != null)
-            for (GridCacheAttributes attrs : caches)
-                if (F.eq(cacheName, attrs.cacheName()))
-                    return true;
-
-        return false;
-    }
-
-    /**
-     * Gets oldest alive node for specified topology version.
-     *
-     * @param cctx Cache context.
-     * @return Oldest node for the current topology version.
-     */
-    public static ClusterNode oldest(GridCacheContext cctx) {
-        return oldest(cctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node across nodes with at least one cache configured.
-     *
-     * @param ctx Cache context.
-     * @return Oldest node.
-     */
-    public static ClusterNode oldest(GridCacheSharedContext ctx) {
-        return oldest(ctx, AffinityTopologyVersion.NONE);
-    }
-
-    /**
-     * Gets oldest alive node for specified topology version.
+     * Gets oldest alive server node with at least one cache configured for specified topology version.
      *
-     * @param cctx Cache context.
-     * @param topOrder Maximum allowed node order.
-     * @return Oldest node for the given topology version.
+     * @param ctx Context.
+     * @param topVer Maximum allowed topology version.
+     * @return Oldest alive cache server node.
      */
-    public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) {
-        ClusterNode oldest = null;
+    @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx,
+        AffinityTopologyVersion topVer) {
+        Collection<ClusterNode> nodes = ctx.discovery().aliveServerNodesWithCaches(topVer);
 
-        for (ClusterNode n : aliveNodes(cctx, topOrder))
-            if (oldest == null || n.order() < oldest.order())
-                oldest = n;
-
-        assert oldest != null : "Failed to find oldest node for cache context [name=" + cctx.name() + ", topOrder=" + topOrder + ']';
-        assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder);
+        if (nodes.isEmpty())
+            return null;
 
-        return oldest;
+        return oldest(nodes);
     }
 
     /**
-     * Gets oldest alive node with at least one cache configured for specified topology version.
-     *
-     * @param cctx Shared cache context.
-     * @param topOrder Maximum allowed node order.
+     * @param nodes Nodes.
      * @return Oldest node for the given topology version.
      */
-    public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) {
+    @Nullable public static ClusterNode oldest(Collection<ClusterNode> nodes) {
         ClusterNode oldest = null;
 
-        for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) {
+        for (ClusterNode n : nodes) {
             if (oldest == null || n.order() < oldest.order())
                 oldest = n;
         }
 
-        assert oldest != null : "Failed to find oldest node with caches: " + topOrder;
-        assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder);
-
         return oldest;
     }
 
@@ -718,30 +576,6 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Closure that converts tx entry to key.
-     */
-    @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<IgniteTxEntry, K> tx2key() {
-        return (IgniteClosure<IgniteTxEntry, K>)tx2key;
-    }
-
-    /**
-     * @return Closure that converts tx entry collection to key collection.
-     */
-    @SuppressWarnings({"unchecked"})
-    public static <K, V> IgniteClosure<Collection<IgniteTxEntry>, Collection<K>> txCol2Key() {
-        return (IgniteClosure<Collection<IgniteTxEntry>, Collection<K>>)txCol2key;
-    }
-
-    /**
-     * @return Converts transaction entry to cache entry.
-     */
-    @SuppressWarnings( {"unchecked"})
-    public static <K, V> IgniteClosure<IgniteTxEntry, GridCacheEntryEx> tx2entry() {
-        return (IgniteClosure<IgniteTxEntry, GridCacheEntryEx>)tx2entry;
-    }
-
-    /**
      * @return Closure which converts transaction entry xid to XID version.
      */
     @SuppressWarnings( {"unchecked"})
@@ -1451,13 +1285,7 @@ public class GridCacheUtils {
     }
 
     /**
-     * @return Cache ID for utility cache.
-     */
-    public static int utilityCacheId() {
-        return cacheId(UTILITY_CACHE_NAME);
-    }
-
-    /**
+     * @param cacheName Cache name.
      * @return Cache ID.
      */
     public static int cacheId(String cacheName) {
@@ -1688,7 +1516,7 @@ public class GridCacheUtils {
     /**
      * @param aff Affinity.
      * @param n Node.
-     * @return Predicate that evaulates to {@code true} if entry is primary for node.
+     * @return Predicate that evaluates to {@code true} if entry is primary for node.
      */
     public static CacheEntryPredicate cachePrimary(
         final Affinity aff,
@@ -1790,4 +1618,76 @@ public class GridCacheUtils {
 
         return res;
     }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+     */
+    public static boolean clientNode(ClusterNode node) {
+        Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
+
+        assert clientModeAttr != null : node;
+
+        return clientModeAttr != null && clientModeAttr;
+    }
+
+    /**
+     * @param node Node.
+     * @param filter Node filter.
+     * @return {@code True} if node is not client node and pass given filter.
+     */
+    public static boolean affinityNode(ClusterNode node, IgnitePredicate<ClusterNode> filter) {
+        return !clientNode(node) && filter.apply(node);
+    }
+
+    /**
+     * Creates and starts store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param factories Factories.
+     * @return Listeners.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public static Collection<CacheStoreSessionListener> startStoreSessionListeners(GridKernalContext ctx,
+        Factory<CacheStoreSessionListener>[] factories) throws IgniteCheckedException {
+        if (factories == null)
+            return null;
+
+        Collection<CacheStoreSessionListener> lsnrs = new ArrayList<>(factories.length);
+
+        for (Factory<CacheStoreSessionListener> factory : factories) {
+            CacheStoreSessionListener lsnr = factory.create();
+
+            if (lsnr != null) {
+                ctx.resource().injectGeneric(lsnr);
+
+                if (lsnr instanceof LifecycleAware)
+                    ((LifecycleAware)lsnr).start();
+
+                lsnrs.add(lsnr);
+            }
+        }
+
+        return lsnrs;
+    }
+
+    /**
+     * Stops store session listeners.
+     *
+     * @param ctx Kernal context.
+     * @param sesLsnrs Session listeners.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public static void stopStoreSessionListeners(GridKernalContext ctx, Collection<CacheStoreSessionListener> sesLsnrs)
+        throws IgniteCheckedException {
+        if (sesLsnrs == null)
+            return;
+
+        for (CacheStoreSessionListener lsnr : sesLsnrs) {
+            if (lsnr instanceof LifecycleAware)
+                ((LifecycleAware)lsnr).stop();
+
+            ctx.resource().cleanupGeneric(lsnr);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f840015..4390993 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -699,6 +699,29 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+        try {
+            CacheOperationContext prev = onEnter(opCtx);
+
+            try {
+                if (isAsync()) {
+                    setFuture(delegate.getAllOutTxAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAllOutTx(keys);
+            }
+            finally {
+                onLeave(prev);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
     /**
      * @param keys Keys.
      * @return Values map.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 5184115..9972f92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -775,6 +775,11 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public Set<K> keySet();
 
     /**
+     * @return Set of keys including internal keys.
+     */
+    public Set<K> keySetx();
+
+    /**
      * Set of keys for which this node is primary.
      * This set is dynamic and may change with grid topology changes.
      * Note that this set will contain mappings for all keys, even if their values are
@@ -1130,11 +1135,9 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public IgniteInternalFuture<Boolean> removeAsync(K key, V val);
 
     /**
-     * Removes given key mappings from cache for entries for which the optionally passed in filters do
-     * pass.
+     * Removes given key mappings from cache.
      * <p>
-     * If write-through is enabled, the values will be removed from {@link CacheStore}
-     * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method.
+     * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing transaction
      * if there is one.
@@ -1145,11 +1148,9 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public void removeAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException;
 
     /**
-     * Asynchronously removes given key mappings from cache for entries for which the optionally
-     * passed in filters do pass.
+     * Asynchronously removes given key mappings from cache for entries.
      * <p>
-     * If write-through is enabled, the values will be removed from {@link CacheStore}
-     * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method.
+     * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}.
      * <h2 class="header">Transactions</h2>
      * This method is transactional and will enlist the entry into ongoing transaction
      * if there is one.
@@ -1161,20 +1162,13 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
     public IgniteInternalFuture<?> removeAllAsync(@Nullable Collection<? extends K> keys);
 
     /**
-     * Removes mappings from cache for entries for which the optionally passed in filters do
-     * pass. If passed in filters are {@code null}, then all entries in cache will be enrolled
-     * into transaction.
+     * Removes mappings from cache.
      * <p>
-     * <b>USE WITH CARE</b> - if your cache has many entries that pass through the filter or if filter
-     * is empty, then transaction will quickly become very heavy and slow. Also, locks
-     * are acquired in undefined order, so it may cause a deadlock when used with
-     * other concurrent transactional updates.
+     * <b>USE WITH CARE</b> - if your cache has many entries then transaction will quickly become very heavy and slow.
      * <p>
-     * If write-through is enabled, the values will be removed from {@link CacheStore}
-     * via <code>@link CacheStore#removeAll(Transaction, Collection)</code> method.
+     * If write-through is enabled, the values will be removed from {@link CacheStore} via {@link IgniteDataStreamer}.
      * <h2 class="header">Transactions</h2>
-     * This method is transactional and will enlist the entry into ongoing transaction
-     * if there is one.
+     * This method is not transactional.
      *
      * @throws IgniteCheckedException If remove failed.
      */
@@ -1618,7 +1612,16 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
      * @return Value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException;
+    public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException;
+
+    /**
+     * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries
+     * and will not lock any keys if pessimistic transaction is started by thread.
+     *
+     * @param keys Keys to get values for.
+     * @return Future for getAllOutTx operation.
+     */
+    public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys);
 
     /**
      * Checks whether this cache is IGFS data cache.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index 61ca882..e5fa891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -23,7 +23,7 @@ import org.jetbrains.annotations.*;
 /**
  *
  */
-public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheObject, Comparable<KeyCacheObjectImpl> {
+public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheObject {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -46,15 +46,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public int compareTo(KeyCacheObjectImpl other) {
-        assert val instanceof Comparable : val;
-        assert other.val instanceof Comparable : val;
-
-        return ((Comparable)val).compareTo(other.val);
-    }
-
-    /** {@inheritDoc} */
     @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException {
         if (valBytes == null)
             valBytes = ctx.processor().marshal(ctx, val);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
index 0186a90..0790052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java
@@ -84,9 +84,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
     @Override public int[] primaryPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
-        Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topVer);
+        Set<Integer> parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion());
 
         return U.toIntArray(parts);
     }
@@ -95,9 +93,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
     @Override public int[] backupPartitions(ClusterNode n) {
         A.notNull(n, "n");
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
-
-        Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topVer);
+        Set<Integer> parts = cctx.affinity().backupPartitions(n.id(), topologyVersion());
 
         return U.toIntArray(parts);
     }
@@ -108,7 +104,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> {
 
         Collection<Integer> parts = new HashSet<>();
 
-        AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+        AffinityTopologyVersion topVer = topologyVersion();
 
         for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) {
             for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index fa8d192..b5c5161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -218,7 +218,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
                         }
                     },
                     new QueueHeaderPredicate(),
-                    cctx.isLocal() || cctx.isReplicated(),
+                    cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()),
                     true);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index b79f9d5..bd72764 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -327,13 +327,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry {
     }
 
     /**
-     *
-     */
-    public void onUnlock() {
-        // No-op.
-    }
-
-    /**
      * Unlocks local lock.
      *
      * @return Removed candidate, or <tt>null</tt> if thread still holds the lock.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index fded3c9..bd1dedf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable {
     /** {@code True} if mapping is for near caches, {@code false} otherwise. */
     private boolean near;
 
+    /** {@code True} if this is first mapping for optimistic tx on client node. */
+    private boolean clientFirst;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable {
     }
 
     /**
+     * @return {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public boolean clientFirst() {
+        return clientFirst;
+    }
+
+    /**
+     * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node.
+     */
+    public void clientFirst(boolean clientFirst) {
+        this.clientFirst = clientFirst;
+    }
+
+    /**
      * @return {@code True} if mapping is for near caches, {@code false} otherwise.
      */
     public boolean near() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 331de4e..c3f3e7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -210,7 +210,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
                 removeNode(exchId.nodeId());
 
             // In case if node joins, get topology at the time of joining node.
-            ClusterNode oldest = CU.oldest(cctx, topVer);
+            ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
+
+            assert oldest != null;
 
             if (log.isDebugEnabled())
                 log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
@@ -218,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             long updateSeq = this.updateSeq.incrementAndGet();
 
             // If this is the oldest node.
-            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) {
+            if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
                 if (node2part == null) {
                     node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
 
@@ -665,7 +667,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId.equals(cctx.localNodeId());
 
         // In case if node joins, get topology at the time of joining node.
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
         // If this node became the oldest node.
         if (oldest.id().equals(cctx.localNodeId())) {
@@ -715,7 +717,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         assert nodeId != null;
         assert lock.writeLock().isHeldByCurrentThread();
 
-        ClusterNode oldest = CU.oldest(cctx, topVer);
+        ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer);
 
         ClusterNode loc = cctx.localNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
index 303d649..7bae7f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java
@@ -96,12 +96,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter<List<List<Cl
 
     /**
      * @param node Node.
-     * @param res Reponse.
+     * @param res Response.
      */
     public void onResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) {
         if (!res.topologyVersion().equals(topVer)) {
             if (log.isDebugEnabled())
-                log.debug("Received affinity assignment for wrong topolgy version (will ignore) " +
+                log.debug("Received affinity assignment for wrong topology version (will ignore) " +
                     "[node=" + node + ", res=" + res + ", topVer=" + topVer + ']');
 
             return;



Mime
View raw message