ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [48/50] [abbrv] ignite git commit: Megre 2.0
Date Wed, 15 Feb 2017 10:45:13 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dc1d04,942ae21..81296e1
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -98,6 -100,15 +98,9 @@@ public abstract class GridCacheMapEntr
      private static final byte IS_UNSWAPPED_MASK = 0x02;
  
      /** */
 -    private static final byte IS_OFFHEAP_PTR_MASK = 0x04;
 -
 -    /** */
 -    private static final byte IS_SWAPPING_REQUIRED = 0x08;
 -
 -    /** */
 -    private static final byte IS_EVICT_DISABLED = 0x10;
++    private static final byte IS_EVICT_DISABLED = 0x04;
+ 
+     /** */
      public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
  
      /**
@@@ -478,20 -783,50 +483,44 @@@
      }
  
      /** {@inheritDoc} */
-     @Nullable @Override public T2<CacheObject, GridCacheVersion> innerGetVersioned(
 -    @Override public EntryGetResult innerGetAndReserveForLoad(boolean readSwap,
 -        boolean updateMetrics,
++    @Override public EntryGetResult innerGetAndReserveForLoad(boolean updateMetrics,
+         boolean evt,
+         UUID subjId,
+         String taskName,
+         @Nullable IgniteCacheExpiryPolicy expiryPlc,
+         boolean keepBinary,
+         @Nullable ReaderArguments readerArgs) throws IgniteCheckedException, GridCacheEntryRemovedException {
+         return (EntryGetResult)innerGet0(
+             /*ver*/null,
+             /*tx*/null,
 -            readSwap,
+             /*readThrough*/false,
+             evt,
+             updateMetrics,
 -            /*tmp*/false,
+             subjId,
+             /*transformClo*/null,
+             taskName,
+             expiryPlc,
+             true,
+             keepBinary,
+             /*reserve*/true,
+             readerArgs);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public EntryGetResult innerGetVersioned(
          @Nullable GridCacheVersion ver,
          IgniteInternalTx tx,
 -        boolean readSwap,
 -        boolean unmarshal,
          boolean updateMetrics,
          boolean evt,
          UUID subjId,
          Object transformClo,
          String taskName,
          @Nullable IgniteCacheExpiryPolicy expiryPlc,
-         boolean keepBinary)
+         boolean keepBinary,
+         @Nullable ReaderArguments readerArgs)
          throws IgniteCheckedException, GridCacheEntryRemovedException {
-         return (T2<CacheObject, GridCacheVersion>)innerGet0(
-             ver,
+         return (EntryGetResult)innerGet0(ver,
              tx,
 -            readSwap,
              false,
              evt,
              updateMetrics,
@@@ -619,14 -966,28 +653,27 @@@
  
              // Cache version for optimistic check.
              startVer = ver;
-         }
+ 
+             addReaderIfNeed(readerArgs);
  
 -            if (ret != null) {
 -                assert tmp || !(ret instanceof BinaryObjectOffheapImpl);
 -                assert !obsolete;
 -                assert !deferred;
 +        if (ret != null) {
 +            assert !obsolete;
 +            assert !deferred;
  
-             // If return value is consistent, then done.
-             return retVer ? new T2<>(ret, resVer) : ret;
+                 // If return value is consistent, then done.
+                 res = retVer ? new EntryGetResult(ret, resVer, false) : ret;
+             }
+             else if (reserveForLoad && !obsolete) {
+                 assert !readThrough;
+                 assert retVer;
+ 
+                 boolean reserve = !evictionDisabled();
+ 
+                 if (reserve)
+                     flags |= IS_EVICT_DISABLED;
+ 
+                 res = new EntryGetResult(null, resVer, reserve);
+             }
          }
  
          if (obsolete) {
@@@ -677,8 -1047,20 +729,10 @@@
  
                      update(ret, expTime, ttl, nextVer, true);
  
 -                    if (hadValPtr && cctx.offheapTiered()) {
 -                        if (log.isTraceEnabled()) {
 -                            log.trace("innerGet removeOffheap [key=" + key +
 -                                ", entry=" + System.identityHashCode(this) +
 -                                ", ptr=" + offHeapPointer() + ']');
 -                        }
 -
 -                        cctx.swap().removeOffheap(key);
 -                    }
 -
                      if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
                          deletedUnlocked(false);
+ 
+                     assert readerArgs == null;
                  }
  
                  if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
@@@ -2593,8 -3064,21 +2647,9 @@@
              value(null);
  
              ver = newVer;
+             flags &= ~IS_EVICT_DISABLED;
  
 -            if (log.isTraceEnabled()) {
 -                log.trace("invalidate releaseSwap [key=" + key +
 -                    ", entry=" + System.identityHashCode(this) +
 -                    ", val=" + val +
 -                    ", ptr=" + offHeapPointer() +
 -                    ']');
 -            }
 -
 -            releaseSwap();
 -
 -            clearIndex(val);
 +            removeValue();
  
              onInvalidate();
          }
@@@ -2672,9 -3155,10 +2727,10 @@@
          ttlAndExpireTimeExtras(ttl, expireTime);
  
          this.ver = ver;
+         flags &= ~IS_EVICT_DISABLED;
  
 -        if (addTracked && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()) && cctx.config().isEagerTtl())
 -            cctx.ttl().addTrackedEntry(this);
 +        if (trackNear && expireTime != 0 && (expireTime != oldExpireTime || isStartVersion()))
 +            cctx.ttl().addTrackedEntry((GridNearCacheEntry)this);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 987d696,0ce4404..0456539
mode 100644,100755..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -585,146 -616,99 +593,144 @@@ public class GridCacheProcessor extend
  
          CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
  
 -        sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx,
 -            ctx.config().getCacheStoreSessionListenerFactories()));
 +        registerCacheFromConfig(cfgs);
 +
 +        registerCacheFromPersistentStore(cfgs);
 +
 +        if (log.isDebugEnabled())
 +            log.debug("Started cache processor.");
 +    }
  
 +    /**
 +     * @param cfgs Cache configurations.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private void registerCacheFromConfig(CacheConfiguration[] cfgs) throws IgniteCheckedException {
          for (int i = 0; i < cfgs.length; i++) {
-             if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName()))
+             if (ctx.config().isDaemon())
                  continue;
  
 -            cloneCheckSerializable(cfgs[i]);
 -
              CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
  
 -            CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
 +            cfgs[i] = cfg; // Replace original configuration value.
  
 -            // Initialize defaults.
 -            initialize(cfg, cacheObjCtx);
 +            registerCache(cfg);
 +        }
 +    }
  
 -            cfgs[i] = cfg; // Replace original configuration value.
 +    /**
 +     * @param cfgs Cache configurations.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private void registerCacheFromPersistentStore(CacheConfiguration[] cfgs) throws IgniteCheckedException {
 +        if (sharedCtx.pageStore() != null &&
 +            sharedCtx.database().persistenceEnabled() &&
 +            !ctx.config().isDaemon()) {
  
 -            String masked = maskNull(cfg.getName());
 +            Set<String> savedCacheNames = sharedCtx.pageStore().savedCacheNames();
  
 -            if (registeredCaches.containsKey(masked)) {
 -                String cacheName = cfg.getName();
 +            for (CacheConfiguration cfg : cfgs)
 +                savedCacheNames.remove(cfg.getName());
  
 -                if (cacheName != null)
 -                    throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
 -                        "assign unique name to each cache): " + U.maskName(cacheName));
 -                else
 -                    throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
 -                        "assign unique name to each cache).");
 +            for (String name : internalCaches)
 +                savedCacheNames.remove(name);
 +
 +            if (!F.isEmpty(savedCacheNames)) {
 +                log.info("Registrate persistent caches: " + savedCacheNames);
 +
 +                for (String name : savedCacheNames) {
 +                    CacheConfiguration cfg = sharedCtx.pageStore().readConfiguration(name);
 +
 +                    if (cfg != null)
 +                        registerCache(cfg);
 +                }
              }
 +        }
 +    }
  
 -            CacheType cacheType;
 +    /**
 +     * @param cfg Cache configuration.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    private void registerCache(CacheConfiguration<?, ?> cfg) throws IgniteCheckedException {
 +        cloneCheckSerializable(cfg);
  
 -            if (CU.isUtilityCache(cfg.getName()))
 -                cacheType = CacheType.UTILITY;
 -            else if (internalCaches.contains(maskNull(cfg.getName())))
 -                cacheType = CacheType.INTERNAL;
 -            else
 -                cacheType = CacheType.USER;
 +        CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
  
 -            boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
 +        // Initialize defaults.
 +        initialize(cfg, cacheObjCtx);
  
 -            DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx, cfg, cacheType, template,
 -                IgniteUuid.randomUuid());
 +        String masked = maskNull(cfg.getName());
  
 -            desc.locallyConfigured(true);
 -            desc.staticallyConfigured(true);
 -            desc.receivedFrom(ctx.localNodeId());
 +        if (registeredCaches.containsKey(masked)) {
 +            String cacheName = cfg.getName();
  
 -            if (!template) {
 -                registeredCaches.put(masked, desc);
 +            if (cacheName != null)
 +                throw new IgniteCheckedException("Duplicate cache name found (check configuration and " +
 +                    "assign unique name to each cache): " + U.maskName(cacheName));
 +            else
 +                throw new IgniteCheckedException("Default cache has already been configured (check configuration and " +
 +                    "assign unique name to each cache).");
 +        }
  
 -                ctx.discovery().setCacheFilter(
 -                    cfg.getName(),
 -                    cfg.getNodeFilter(),
 -                    cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
 -                    cfg.getCacheMode());
 +        CacheType cacheType;
  
 -                ctx.discovery().addClientNode(cfg.getName(),
 -                    ctx.localNodeId(),
 -                    cfg.getNearConfiguration() != null);
 +        if (CU.isUtilityCache(cfg.getName()))
 +            cacheType = CacheType.UTILITY;
-         else if (CU.isMarshallerCache(cfg.getName()))
-             cacheType = CacheType.MARSHALLER;
 +        else if (internalCaches.contains(maskNull(cfg.getName())))
 +            cacheType = CacheType.INTERNAL;
 +        else
 +            cacheType = CacheType.USER;
  
 -                if (!cacheType.userCache())
 -                    stopSeq.addLast(cfg.getName());
 -                else
 -                    stopSeq.addFirst(cfg.getName());
 -            }
 -            else {
 -                if (log.isDebugEnabled())
 -                    log.debug("Use cache configuration as template: " + cfg);
 +        boolean template = cfg.getName() != null && cfg.getName().endsWith("*");
  
 -                registeredTemplates.put(masked, desc);
 -            }
 +        DynamicCacheDescriptor desc = new DynamicCacheDescriptor(ctx,
 +            cfg,
 +            cacheType,
 +            template,
 +            IgniteUuid.randomUuid());
  
 -            if (cfg.getName() == null) { // Use cache configuration with null name as template.
 -                DynamicCacheDescriptor desc0 =
 -                    new DynamicCacheDescriptor(ctx, cfg, cacheType, true, IgniteUuid.randomUuid());
 +        desc.locallyConfigured(true);
 +        desc.staticallyConfigured(true);
 +        desc.receivedFrom(ctx.localNodeId());
  
 -                desc0.locallyConfigured(true);
 -                desc0.staticallyConfigured(true);
 +        if (!template) {
 +            registeredCaches.put(masked, desc);
  
 -                registeredTemplates.put(masked, desc0);
 -            }
 +            ctx.discovery().setCacheFilter(
 +                cfg.getName(),
 +                cfg.getNodeFilter(),
 +                cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED,
 +                cfg.getCacheMode());
 +
 +            ctx.discovery().addClientNode(cfg.getName(),
 +                ctx.localNodeId(),
 +                cfg.getNearConfiguration() != null);
 +
 +            if (!cacheType.userCache())
 +                stopSeq.addLast(cfg.getName());
 +            else
 +                stopSeq.addFirst(cfg.getName());
          }
 +        else {
 +            if (log.isDebugEnabled())
 +                log.debug("Use cache configuration as template: " + cfg);
  
 -        // Start shared managers.
 -        for (GridCacheSharedManager mgr : sharedCtx.managers())
 -            mgr.start(sharedCtx);
 +            registeredTemplates.put(masked, desc);
 +        }
  
 -        transactions = new IgniteTransactionsImpl(sharedCtx);
 +        if (cfg.getName() == null) { // Use cache configuration with null name as template.
 +            DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx,
 +                cfg,
 +                cacheType,
 +                true,
 +                IgniteUuid.randomUuid());
  
 -        if (log.isDebugEnabled())
 -            log.debug("Started cache processor.");
 +            desc0.locallyConfigured(true);
 +            desc0.staticallyConfigured(true);
 +
 +            registeredTemplates.put(masked, desc0);
 +        }
      }
  
      /**
@@@ -776,27 -769,11 +782,27 @@@
                          }
                      }
                  }
 +
 +                if (!tmpCacheCfg.isEmpty()) {
 +                    CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()];
 +
 +                    tmpCacheCfg.toArray(newCacheCfg);
 +
 +                    ctx.config().setCacheConfiguration(newCacheCfg);
 +                }
 +
 +                activeOnStart = currStatus;
              }
  
 +            if (activeOnStart && !ctx.clientNode() && !ctx.isDaemon())
 +                sharedCtx.database().lock();
 +
 +            // Must start database before start first cache.
 +            sharedCtx.database().onKernalStart(false);
 +
              // Start dynamic caches received from collect discovery data.
              for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                 if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName()))
+                 if (ctx.config().isDaemon())
                      continue;
  
                  desc.clearRemoteConfigurations();
@@@ -878,50 -848,34 +882,60 @@@
              }
          }
  
-         assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started";
          assert ctx.config().isDaemon() || caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started";
+ 
+         if (!ctx.clientNode() && !ctx.isDaemon())
+             addRemovedItemsCleanupTask(Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000));
+ 
+     }
+ 
+     /**
+      * @param timeout Cleanup timeout.
+      */
+     private void addRemovedItemsCleanupTask(long timeout) {
+         ctx.timeout().addTimeoutObject(new RemovedItemsCleanupTask(timeout));
      }
  
 -    /** {@inheritDoc} */
 -    @SuppressWarnings("unchecked")
 -    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 -        for (String cacheName : stopSeq) {
 -            GridCacheAdapter<?, ?> cache = stoppedCaches.remove(maskNull(cacheName));
 +    /**
 +     *
 +     */
 +    private void checkConsistency() throws IgniteCheckedException {
 +        if (!ctx.config().isDaemon() && !getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
 +            for (ClusterNode n : ctx.discovery().remoteNodes()) {
 +                if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED))
 +                    continue;
  
 -            if (cache != null)
 -                stopCache(cache, cancel);
 -        }
 +                checkTransactionConfiguration(n);
  
 -        for (GridCacheAdapter<?, ?> cache : stoppedCaches.values()) {
 -            if (cache == stoppedCaches.remove(maskNull(cache.name())))
 -                stopCache(cache, cancel);
 +                DeploymentMode locDepMode = ctx.config().getDeploymentMode();
 +                DeploymentMode rmtDepMode = n.attribute(IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE);
 +
 +                CU.checkAttributeMismatch(
 +                    log, null, n.id(), "deploymentMode", "Deployment mode",
 +                    locDepMode, rmtDepMode, true);
 +
 +                for (DynamicCacheDescriptor desc : registeredCaches.values()) {
 +                    CacheConfiguration rmtCfg = desc.remoteConfiguration(n.id());
 +
 +                    if (rmtCfg != null) {
 +                        CacheConfiguration locCfg = desc.cacheConfiguration();
 +
 +                        checkCache(locCfg, rmtCfg, n);
 +
 +                        // Check plugin cache configurations.
 +                        CachePluginManager pluginMgr = desc.pluginManager();
 +
 +                        pluginMgr.validateRemotes(rmtCfg, n);
 +                    }
 +                }
 +            }
          }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public void stop(boolean cancel) throws IgniteCheckedException {
 +        stopCaches(cancel);
  
          List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers();
  
@@@ -2032,83 -1932,93 +2058,93 @@@
  
              clientNodesMap = U.newHashMap(caches.size());
  
-             for (GridCacheAdapter<?, ?> cache : caches.values()) {
-                 DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
+             collectDataOnReconnectingNode(reqs, clientNodesMap, joiningNodeId);
+         }
+         else {
 -            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size());
++            reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
  
-                 if (desc == null)
-                     continue;
+             clientNodesMap = ctx.discovery().clientNodesMap();
  
-                 // RequestId must be null because on different node will be different byte [] and
-                 // we get duplicate discovery data, for more details see
-                 // TcpDiscoveryNodeAddedMessage#addDiscoveryData.
-                 DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-                     null, cache.name(), null);
+             collectDataOnGridNode(reqs);
+         }
  
-                 req.startCacheConfiguration(desc.cacheConfiguration());
+         DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
  
-                 req.cacheType(desc.cacheType());
+         batch.clientNodes(clientNodesMap);
  
-                 req.deploymentId(desc.deploymentId());
+         batch.clientReconnect(reconnect);
  
-                 req.receivedFrom(desc.receivedFrom());
+         // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
+         batch.id(null);
  
-                 reqs.add(req);
+         return batch;
+     }
  
-                 Boolean nearEnabled = cache.isNear();
+     /**
+      * @param reqs requests.
+      */
+     private void collectDataOnGridNode(Collection<DynamicCacheChangeRequest> reqs) {
+         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+             DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
  
-                 Map<UUID, Boolean> map = U.newHashMap(1);
+             req.startCacheConfiguration(desc.cacheConfiguration());
  
-                 map.put(nodeId, nearEnabled);
+             req.cacheType(desc.cacheType());
  
-                 clientNodesMap.put(cache.name(), map);
-             }
-         }
-         else {
-             reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size() + 1);
+             req.deploymentId(desc.deploymentId());
  
-             for (DynamicCacheDescriptor desc : registeredCaches.values()) {
-                 DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-                     null, desc.cacheConfiguration().getName(), null);
+             req.receivedFrom(desc.receivedFrom());
  
-                 req.startCacheConfiguration(desc.cacheConfiguration());
+             reqs.add(req);
+         }
  
-                 req.cacheType(desc.cacheType());
+         for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
+             DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null);
  
-                 req.deploymentId(desc.deploymentId());
+             req.startCacheConfiguration(desc.cacheConfiguration());
  
-                 req.receivedFrom(desc.receivedFrom());
+             req.template(true);
  
-                 reqs.add(req);
-             }
+             reqs.add(req);
+         }
+     }
  
-             for (DynamicCacheDescriptor desc : registeredTemplates.values()) {
-                 DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(
-                     null, desc.cacheConfiguration().getName(), null);
+     /**
+      * @param reqs requests.
+      * @param clientNodesMap Client nodes map.
+      * @param nodeId Node id.
+      */
+     private void collectDataOnReconnectingNode(
+             Collection<DynamicCacheChangeRequest> reqs,
+             Map<String, Map<UUID, Boolean>> clientNodesMap,
+             UUID nodeId
+     ) {
+         for (GridCacheAdapter<?, ?> cache : caches.values()) {
+             DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name()));
  
-                 req.startCacheConfiguration(desc.cacheConfiguration());
+             if (desc == null)
+                 continue;
  
-                 req.template(true);
+             DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null);
  
-                 //todo check why id removed
-                /* req.deploymentId(desc.deploymentId());*/
+             req.startCacheConfiguration(desc.cacheConfiguration());
  
-                 reqs.add(req);
-             }
+             req.cacheType(desc.cacheType());
  
-             clientNodesMap = ctx.discovery().clientNodesMap();
-         }
+             req.deploymentId(desc.deploymentId());
  
-         DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs);
+             req.receivedFrom(desc.receivedFrom());
  
-         batch.clientNodes(clientNodesMap);
+             reqs.add(req);
  
-         batch.clientReconnect(reconnect);
+             Boolean nearEnabled = cache.isNear();
  
-         //todo check
-         // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
-         batch.id(null);
+             Map<UUID, Boolean> map = U.newHashMap(1);
  
-         return batch;
+             map.put(nodeId, nearEnabled);
+ 
+             clientNodesMap.put(cache.name(), map);
+         }
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 89c54c4,7e4deff..bd4c4ac
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@@ -260,51 -263,6 +264,51 @@@ public abstract class GridDistributedCa
      }
  
      /** {@inheritDoc} */
 +    @Override public long localSizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException {
 +        PeekModes modes = parsePeekModes(peekModes, true);
 +
 +        long size = 0;
 +
 +        if (modes.near)
 +            size += nearSize();
 +
 +        // Swap and offheap are disabled for near cache.
 +        if (modes.primary || modes.backup) {
 +            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 +
 +            IgniteCacheOffheapManager offheap = ctx.offheap();
 +
 +            if (modes.offheap)
 +                size += offheap.entriesCount(modes.primary, modes.backup, topVer);
 +        }
 +
 +        return size;
 +    }
 +
 +    /** {@inheritDoc} */
-     @Override public long localSizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException {
++    @Override public long localSizeLong(int part, CachePeekMode[] peekModes) throws IgniteCheckedException {
 +        PeekModes modes = parsePeekModes(peekModes, true);
 +
 +        long size = 0;
 +
 +        if (modes.near)
 +            size += nearSize();
 +
 +        // Swap and offheap are disabled for near cache.
 +        if (modes.offheap) {
 +            AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 +
 +            IgniteCacheOffheapManager offheap = ctx.offheap();
 +
-             if (ctx.affinity().primary(ctx.localNode(), partition, topVer) && modes.primary ||
-                 ctx.affinity().backup(ctx.localNode(), partition, topVer) && modes.backup)
-                 size += offheap.entriesCount(partition);
++            if (ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer) && modes.primary ||
++                ctx.affinity().backupByPartition(ctx.localNode(), part, topVer) && modes.backup)
++                size += offheap.entriesCount(part);
 +        }
 +
 +        return size;
 +    }
 +
 +    /** {@inheritDoc} */
      @Override public String toString() {
          return S.toString(GridDistributedCacheAdapter.class, this, "super", super.toString());
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index fba1877,f5865e6..0c935ec
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@@ -650,10 -657,10 +654,11 @@@ public abstract class GridDhtCacheAdapt
          String taskName,
          @Nullable IgniteCacheExpiryPolicy expiry,
          boolean skipVals,
 -        boolean canRemap
 +        boolean canRemap,
 +        boolean recovery
      ) {
          return getAllAsync0(keys,
+             readerArgs,
              readThrough,
              /*don't check local tx. */false,
              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 2c3435d,f6db669..bbf0cb5
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@@ -395,29 -393,19 +398,20 @@@ public final class GridDhtGetFuture<K, 
          IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
  
          if (txFut == null || txFut.isDone()) {
-             if (tx == null) {
-                 fut = cache().getDhtAllAsync(
-                     keys.keySet(),
-                     readThrough,
-                     subjId,
-                     taskName,
-                     expiryPlc,
-                     skipVals,
-                     /*can remap*/true,
-                     recovery);
-             }
-             else {
-                 fut = tx.getAllAsync(cctx,
-                     null,
-                     keys.keySet(),
-                     /*deserialize binary*/false,
-                     skipVals,
-                     /*keep cache objects*/true,
-                     /*skip store*/!readThrough,
-                     false);
-             }
+             fut = cache().getDhtAllAsync(
+                 keys.keySet(),
+                 readerArgs,
+                 readThrough,
+                 subjId,
+                 taskName,
+                 expiryPlc,
+                 skipVals,
 -                /*can remap*/true);
++                /*can remap*/true,
++                recovery);
          }
          else {
+             final ReaderArguments args = readerArgs;
+ 
              // If we are here, then there were active transactions for some entries
              // when we were adding the reader. In that case we must wait for those
              // transactions to complete.
@@@ -428,27 -416,15 +422,16 @@@
                          if (e != null)
                              throw new GridClosureException(e);
  
-                         if (tx == null) {
-                             return cache().getDhtAllAsync(
-                                 keys.keySet(),
-                                 readThrough,
-                                 subjId,
-                                 taskName,
-                                 expiryPlc,
-                                 skipVals,
-                                 /*can remap*/true,
-                                 recovery);
-                         }
-                         else {
-                             return tx.getAllAsync(cctx,
-                                 null,
-                                 keys.keySet(),
-                                 /*deserialize binary*/false,
-                                 skipVals,
-                                 /*keep cache objects*/true,
-                                 /*skip store*/!readThrough,
-                                 false);
-                         }
+                         return cache().getDhtAllAsync(
+                             keys.keySet(),
+                             args,
+                             readThrough,
+                             subjId,
+                             taskName,
+                             expiryPlc,
+                             skipVals,
 -                            /*can remap*/true);
++                            /*can remap*/true,
++                            recovery);
                      }
                  }
              );

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 81d2570,0cb3920..ccccd76
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@@ -353,29 -351,19 +356,20 @@@ public final class GridDhtGetSingleFutu
          IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut;
  
          if (rdrFut == null || rdrFut.isDone()) {
-             if (tx == null) {
-                 fut = cache().getDhtAllAsync(
-                     Collections.singleton(key),
-                     readThrough,
-                     subjId,
-                     taskName,
-                     expiryPlc,
-                     skipVals,
-                     /*can remap*/true,
-                     recovery);
-             }
-             else {
-                 fut = tx.getAllAsync(cctx,
-                     null,
-                     Collections.singleton(key),
-                     /*deserialize binary*/false,
-                     skipVals,
-                     /*keep cache objects*/true,
-                     /*skip store*/!readThrough,
-                     false);
-             }
+             fut = cache().getDhtAllAsync(
+                 Collections.singleton(key),
+                 readerArgs,
+                 readThrough,
+                 subjId,
+                 taskName,
+                 expiryPlc,
+                 skipVals,
 -                /*can remap*/true);
++                /*can remap*/true,
++                recovery);
          }
          else {
+             final ReaderArguments args = readerArgs;
+ 
              rdrFut.listen(
                  new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
                      @Override public void apply(IgniteInternalFuture<Boolean> fut) {
@@@ -397,20 -384,7 +390,8 @@@
                                  taskName,
                                  expiryPlc,
                                  skipVals,
 -                                /*can remap*/true);
 +                                /*can remap*/true,
 +                                recovery);
-                         }
-                         else {
-                             fut0 = tx.getAllAsync(cctx,
-                                 null,
-                                 Collections.singleton(key),
-                                 /*deserialize binary*/false,
-                                 skipVals,
-                                 /*keep cache objects*/true,
-                                 /*skip store*/!readThrough,
-                                 false
-                             );
-                         }
  
                          fut0.listen(createGetFutureListener());
                      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 9b30593,9f8498a..444fcdb
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@@ -49,24 -44,23 +49,25 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
  import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
- import org.apache.ignite.internal.util.GridCircularBuffer;
+ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
  import org.apache.ignite.internal.util.future.GridFutureAdapter;
 -import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 +import org.apache.ignite.internal.util.lang.GridIterator;
  import org.apache.ignite.internal.util.tostring.GridToStringExclude;
- import org.apache.ignite.internal.util.typedef.CI1;
- import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.F;
  import org.apache.ignite.internal.util.typedef.internal.CU;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.apache.ignite.lang.IgniteUuid;
  import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
+ import org.jsr166.ConcurrentLinkedDeque8;
  
  import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
+ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
  import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
 +import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
 +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
  import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
@@@ -160,22 -149,9 +166,24 @@@ public class GridDhtLocalPartition impl
          int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
              Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
  
-         rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+         rmvQueueMaxSize = U.ceilPow2(delQueueSize);
+ 
+         rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
 +
 +        try {
 +            store = cctx.offheap().createCacheDataStore(id);
 +        }
 +        catch (IgniteCheckedException e) {
 +            // TODO ignite-db
 +            throw new IgniteException(e);
 +        }
 +    }
 +
 +    /**
 +     * @return Data store.
 +     */
 +    public CacheDataStore dataStore() {
 +        return store;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 322bbe3,75a275c..e11a770
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -761,14 -691,12 +761,14 @@@ class GridDhtPartitionTopologyImpl impl
          try {
              loc = locParts.get(p);
  
 +            state = loc != null ? loc.state() : null;
 +
-             boolean belongs = cctx.affinity().localNode(p, topVer);
+             boolean belongs = cctx.affinity().partitionLocalNode(p, topVer);
  
 -            if (loc != null && loc.state() == EVICTED) {
 +            if (loc != null && state == EVICTED) {
                  locParts.set(p, loc = null);
  
 -                if (!belongs)
 +                if (!treatAllPartAsLoc && !belongs)
                      throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
                          "(often may be caused by inconsistent 'key.hashCode()' implementation) " +
                          "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index debf8b6,67e1993..842a70b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@@ -583,21 -585,25 +585,21 @@@ public abstract class GridDhtTxLocalAda
                  if (txEntry == null) {
                      GridDhtCacheEntry cached;
  
 -                    if (dhtCache.context().isSwapOrOffheapEnabled()) {
 -                        while (true) {
 -                            try {
 -                                cached = dhtCache.entryExx(key, topVer);
 +                    while (true) {
 +                        try {
 +                            cached = dhtCache.entryExx(key, topVer);
  
 -                                cached.unswap(read);
 +                            cached.unswap(read);
  
 -                                break;
 -                            }
 -                            catch (GridCacheEntryRemovedException ignored) {
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Get removed entry: " + key);
 -                            }
 +                            break;
 +                        }
-                         catch (GridCacheEntryRemovedException ignore) {
++                        catch (GridCacheEntryRemovedException ignored) {
 +                            if (log.isDebugEnabled())
 +                                log.debug("Get removed entry: " + key);
                          }
                      }
 -                    else
 -                        cached = dhtCache.entryExx(key, topVer);
  
 -                    addActiveCache(dhtCache.context());
 +                    addActiveCache(dhtCache.context(), false);
  
                      txEntry = addEntry(NOOP,
                          null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6c4da68,1fe2d69..a088adf
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@@ -353,10 -341,10 +354,11 @@@ public class GridPartitionedGetFuture<K
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
  
                  add(fut); // Append new future.
  
@@@ -461,9 -451,11 +463,9 @@@
                      GridCacheVersion ver = null;
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index ea69743,bdccaa3..a46b7f6
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@@ -316,10 -301,10 +318,11 @@@ public class GridPartitionedSingleGetFu
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
              }
  
              try {
@@@ -388,9 -376,11 +391,9 @@@
                      GridCacheVersion ver = null;
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 72489fd,1b6179e..cdb19a4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -555,11 -548,8 +556,10 @@@ public class GridDhtAtomicCache<K, V> e
      /** {@inheritDoc} */
      @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer)
          throws IgniteCheckedException {
 +        CacheOperationContext opCtx = ctx.operationContextPerCall();
 +
          return getAllAsyncInternal(keys,
              !ctx.config().isReadFromBackup(),
-             true,
              null,
              ctx.kernalContext().job().currentTaskName(),
              deserializeBinary,
@@@ -1607,10 -1584,12 +1605,10 @@@
                              GridCacheVersion ver = null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                 EntryGetResult res = entry.innerGetVersioned(
                                      null,
                                      null,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
 -                                    /**update-metrics*/false,
 +                                    /*update-metrics*/false,
                                      /*event*/!skipVals,
                                      subjId,
                                      null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index d5e8389,4b090ad..73d46c4
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@@ -476,9 -472,11 +477,9 @@@ public class GridDhtColocatedCache<K, V
                              GridCacheVersion ver = null;
  
                              if (needVer) {
-                                 T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                                 EntryGetResult res = entry.innerGetVersioned(
                                      null,
                                      null,
 -                                    /*swap*/true,
 -                                    /*unmarshal*/true,
                                      /**update-metrics*/false,
                                      /*event*/!skipVals,
                                      subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 358ec8f,79ca108..b7d6a2b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -911,37 -911,38 +917,38 @@@ public final class GridDhtColocatedLock
                                          !topLocked &&
                                          (tx == null || !tx.hasRemoteLocks());
  
 -                                        first = false;
 -                                    }
 +                                    first = false;
 +                                }
  
-                                 req = new GridNearLockRequest(
-                                     cctx.cacheId(),
-                                     topVer,
-                                     cctx.nodeId(),
-                                     threadId,
-                                     futId,
-                                     lockVer,
-                                     inTx(),
-                                     implicitTx(),
-                                     implicitSingleTx(),
-                                     read,
-                                     retval,
-                                     isolation(),
-                                     isInvalidate(),
-                                     timeout,
-                                     mappedKeys.size(),
-                                     inTx() ? tx.size() : mappedKeys.size(),
-                                     inTx() && tx.syncMode() == FULL_SYNC,
-                                     inTx() ? tx.subjectId() : null,
-                                     inTx() ? tx.taskNameHash() : 0,
-                                     read ? accessTtl : -1L,
-                                     skipStore,
-                                     keepBinary,
-                                     clientFirst,
-                                     cctx.deploymentEnabled());
+                                     req = new GridNearLockRequest(
+                                         cctx.cacheId(),
+                                         topVer,
+                                         cctx.nodeId(),
+                                         threadId,
+                                         futId,
+                                         lockVer,
+                                         inTx(),
+                                         implicitTx(),
+                                         implicitSingleTx(),
+                                         read,
+                                         retval,
+                                         isolation(),
+                                         isInvalidate(),
+                                         timeout,
+                                         mappedKeys.size(),
+                                         inTx() ? tx.size() : mappedKeys.size(),
+                                         inTx() && tx.syncMode() == FULL_SYNC,
+                                         inTx() ? tx.subjectId() : null,
+                                         inTx() ? tx.taskNameHash() : 0,
+                                         read ? createTtl : -1L,
+                                         read ? accessTtl : -1L,
+                                         skipStore,
+                                         keepBinary,
+                                         clientFirst,
+                                         cctx.deploymentEnabled());
  
 -                                    mapping.request(req);
 -                                }
 +                                mapping.request(req);
 +                            }
  
                              distributedKeys.add(key);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 97d768a,9942423..b80ad04
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@@ -272,24 -288,22 +272,24 @@@ class GridDhtPartitionSupplier 
                      boolean partMissing = false;
  
                      if (phase == SupplyContextPhase.NEW)
 -                        phase = SupplyContextPhase.ONHEAP;
 +                        phase = SupplyContextPhase.OFFHEAP;
 +
 +                    if (phase == SupplyContextPhase.OFFHEAP) {
 +                        IgniteRebalanceIterator iter;
 +
 +                        if (sctx == null || sctx.entryIt == null) {
 +                            iter = cctx.offheap().rebalanceIterator(part, d.topologyVersion(), d.partitionCounter(part));
  
 -                    if (phase == SupplyContextPhase.ONHEAP) {
 -                        Iterator<GridCacheMapEntry> entIt = sctx != null ?
 -                            (Iterator<GridCacheMapEntry>)sctx.entryIt : loc.allEntries().iterator();
 +                            if (!iter.historical())
 +                                s.clean(part);
 +                        }
 +                        else
 +                            iter = (IgniteRebalanceIterator)sctx.entryIt;
  
 -                        while (entIt.hasNext()) {
 +                        while (iter.hasNext()) {
-                             if (!cctx.affinity().belongs(node, part, d.topologyVersion())) {
+                             if (!cctx.affinity().partitionBelongs(node, part, d.topologyVersion())) {
 -                                // Demander no longer needs this partition, so we send '-1' partition and move on.
 +                                // Demander no longer needs this partition,
 +                                // so we send '-1' partition and move on.
                                  s.missed(part);
  
                                  if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index af0085d,a334fd5..68ce5ea
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1402,11 -1183,10 +1407,13 @@@ public class GridDhtPartitionsExchangeF
  
              if (crd.isLocal()) {
                  if (remaining.remove(node.id())) {
-                     updatePartitionSingleMap(node, msg);
+                     updateSingleMap = true;
+ 
+                     pendingSingleUpdates++;
  
 +                    if (exchangeOnChangeGlobalState && msg.getException() != null)
 +                        changeGlobalStateExceptions.put(node.id(), msg.getException());
 +
                      allReceived = remaining.isEmpty();
                  }
              }
@@@ -1414,8 -1194,42 +1421,42 @@@
                  singleMsgs.put(node, msg);
          }
  
-         if (allReceived)
+         if (updateSingleMap) {
+             try {
 -                updatePartitionSingleMap(msg);
++                updatePartitionSingleMap(node, msg);
+             }
+             finally {
+                 synchronized (mux) {
+                     assert pendingSingleUpdates > 0;
+ 
+                     pendingSingleUpdates--;
+ 
+                     if (pendingSingleUpdates == 0)
+                         mux.notifyAll();
+                 }
+             }
+         }
+ 
+         if (allReceived) {
+             awaitSingleMapUpdates();
+ 
              onAllReceived();
+         }
+     }
+ 
+     /**
+      *
+      */
+     private void awaitSingleMapUpdates() {
+         synchronized (mux) {
+             try {
+                 while (pendingSingleUpdates > 0)
+                     U.wait(mux);
+             }
+             catch (IgniteInterruptedCheckedException e) {
+                 U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
+             }
+         }
      }
  
      /**
@@@ -1937,10 -1613,9 +1978,12 @@@
                          }
  
                          if (crd0.isLocal()) {
 +                            if (exchangeOnChangeGlobalState && changeGlobalStateE !=null)
 +                                changeGlobalStateExceptions.put(crd0.id(), changeGlobalStateE);
 +
                              if (allReceived) {
+                                 awaitSingleMapUpdates();
+ 
                                  onAllReceived();
  
                                  return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index ff4e838,480b323..641f73d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@@ -377,10 -375,10 +378,11 @@@ public final class GridNearGetFuture<K
                      topVer,
                      subjId,
                      taskName == null ? 0 : taskName.hashCode(),
+                     expiryPlc != null ? expiryPlc.forCreate() : -1L,
                      expiryPlc != null ? expiryPlc.forAccess() : -1L,
                      skipVals,
 -                    cctx.deploymentEnabled());
 +                    cctx.deploymentEnabled(),
 +                    recovery);
  
                  add(fut); // Append new future.
  
@@@ -441,9 -439,11 +443,9 @@@
                  // First we peek into near cache.
                  if (isNear) {
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                         EntryGetResult res = entry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/true,
                              /*event*/!skipVals,
                              subjId,
@@@ -577,9 -580,11 +580,9 @@@
                      boolean isNew = dhtEntry.isNewLocked() || !dhtEntry.valid(topVer);
  
                      if (needVer) {
-                         T2<CacheObject, GridCacheVersion> res = dhtEntry.innerGetVersioned(
+                         EntryGetResult res = dhtEntry.innerGetVersioned(
                              null,
                              null,
 -                            /*swap*/true,
 -                            /*unmarshal*/true,
                              /**update-metrics*/false,
                              /*event*/!nearRead && !skipVals,
                              subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index b096d5d,7ca2635..c5aec19
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@@ -137,10 -138,10 +141,11 @@@ public class GridNearGetRequest extend
          @NotNull AffinityTopologyVersion topVer,
          UUID subjId,
          int taskNameHash,
+         long createTtl,
          long accessTtl,
          boolean skipVals,
 -        boolean addDepInfo
 +        boolean addDepInfo,
 +        boolean recovery
      ) {
          assert futId != null;
          assert miniId != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index f09b6c8,976f05f..1d610c7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@@ -62,6 -62,19 +62,19 @@@ public abstract class GridNearOptimisti
          }
  
          if (topVer != null) {
+             try {
 -                IgniteCheckedException err = tx.txState().validateTopology(cctx, topologyReadLock());
++                IgniteCheckedException err = tx.txState().validateTopology(cctx, false, topologyReadLock());
+ 
+                 if (err != null) {
+                     onDone(err);
+ 
+                     return;
+                 }
+             }
+             finally {
+                 topologyReadUnlock();
+             }
+ 
              tx.topologyVersion(topVer);
  
              cctx.mvcc().addFuture(this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ba48e7a,197792b..cc2c090
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@@ -329,17 -329,20 +329,22 @@@ public class GridNearTxLocal extends Gr
          final boolean skipVals,
          final boolean needVer,
          boolean keepBinary,
 +        boolean recovery,
+         final ExpiryPolicy expiryPlc,
          final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
      ) {
+         IgniteCacheExpiryPolicy expiryPlc0 = optimistic() ?
+             accessPolicy(cacheCtx, keys) :
+             cacheCtx.cache().expiryPolicy(expiryPlc);
+ 
          if (cacheCtx.isNear()) {
              return cacheCtx.nearTx().txLoadAsync(this,
                  topVer,
                  keys,
                  readThrough,
                  /*deserializeBinary*/false,
 +                recovery,
-                 accessPolicy(cacheCtx, keys),
+                 expiryPlc0,
                  skipVals,
                  needVer).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                  @Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
@@@ -402,8 -404,7 +407,8 @@@
                      CU.subjectId(this, cctx),
                      resolveTaskName(),
                      /*deserializeBinary*/false,
 +                    recovery,
-                     accessPolicy(cacheCtx, keys),
+                     expiryPlc0,
                      skipVals,
                      /*can remap*/true,
                      needVer,
@@@ -437,7 -438,7 +442,8 @@@
                  skipVals,
                  keepBinary,
                  needVer,
 +                recovery,
+                 expiryPlc,
                  c);
          }
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 7aaa476,695fdcb..d687d50
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@@ -410,9 -415,11 +411,9 @@@ public class GridLocalAtomicCache<K, V
                          GridCacheVersion ver;
  
                          if (needVer) {
-                             T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(
+                             EntryGetResult res = entry.innerGetVersioned(
                                  null,
                                  null,
 -                                /*swap*/swapOrOffheap,
 -                                /*unmarshal*/true,
                                  /**update-metrics*/false,
                                  /*event*/!skipVals,
                                  subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6731179,c434401..f7608c9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -365,12 -380,22 +377,22 @@@ public abstract class GridCacheQueryMan
       * @param key Key.
       * @throws IgniteCheckedException If failed.
       */
 -    public void onSwap(CacheObject key) throws IgniteCheckedException {
 +    public void onSwap(KeyCacheObject key, int partId) throws IgniteCheckedException {
+         if(!enabled)
+             return;
+ 
          if (!enterBusy())
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.onSwap(space, key, partId);
+             if (isIndexingSpiEnabled()) {
+                 Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+ 
+                 cctx.kernalContext().indexing().onSwap(space, key0);
+             }
+ 
 -            if(qryProcEnabled)
 -                qryProc.onSwap(space, key);
++            if (qryProcEnabled)
++                qryProc.onSwap(space, key, partId);
          }
          finally {
              leaveBusy();
@@@ -384,12 -417,26 +414,26 @@@
       * @param val Value
       * @throws IgniteCheckedException If failed.
       */
 -    public void onUnswap(CacheObject key, CacheObject val) throws IgniteCheckedException {
 -        if(!enabled)
 +    public void onUnswap(KeyCacheObject key, int partId, CacheObject val) throws IgniteCheckedException {
++        if (!enabled)
+             return;
+ 
          if (!enterBusy())
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.onUnswap(space, key, partId, val);
+             if (isIndexingSpiEnabled()) {
+                 CacheObjectContext coctx = cctx.cacheObjectContext();
+ 
+                 Object key0 = unwrapIfNeeded(key, coctx);
+ 
+                 Object val0 = unwrapIfNeeded(val, coctx);
+ 
+                 cctx.kernalContext().indexing().onUnswap(space, key0, val0);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.onUnswap(space, key, val);
++                qryProc.onUnswap(space, key, partId, val);
          }
          finally {
              leaveBusy();
@@@ -434,10 -470,21 +478,21 @@@
              return; // No-op.
  
          if (!enterBusy())
 -            return; // Ignore index update when node is stopping.
 +            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
  
          try {
-             qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
+             if (isIndexingSpiEnabled()) {
+                 CacheObjectContext coctx = cctx.cacheObjectContext();
+ 
+                 Object key0 = unwrapIfNeeded(key, coctx);
+ 
+                 Object val0 = unwrapIfNeeded(val, coctx);
+ 
+                 cctx.kernalContext().indexing().store(space, key0, val0, expirationTime);
+             }
+ 
+             if(qryProcEnabled)
 -                qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime);
++                qryProc.store(space, key, partId, prevVal, prevVer, val, ver, expirationTime, link);
          }
          finally {
              invalidateResultCache();
@@@ -464,7 -509,14 +519,14 @@@
              return; // Ignore index update when node is stopping.
  
          try {
-             qryProc.remove(space, key, partId, val, ver);
+             if (isIndexingSpiEnabled()) {
+                 Object key0 = unwrapIfNeeded(key, cctx.cacheObjectContext());
+ 
+                 cctx.kernalContext().indexing().remove(space, key0);
+             }
+ 
 -            if(qryProcEnabled)
 -                qryProc.remove(space, key, val);
++            if (qryProcEnabled)
++                qryProc.remove(space, key, partId, val, ver);
          }
          finally {
              invalidateResultCache();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 41eec6a,b3f0684..1eb9e10
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@@ -864,21 -882,21 +885,21 @@@ public class CacheContinuousQueryHandle
                  GridCacheAffinityManager aff = cctx.affinity();
  
                  if (initUpdCntrsPerNode != null) {
-                     for (ClusterNode node : aff.nodes(partId, initTopVer)) {
+                     for (ClusterNode node : aff.nodesByPartition(partId, initTopVer)) {
 -                        Map<Integer, Long> map = initUpdCntrsPerNode.get(node.id());
 +                        Map<Integer, T2<Long, Long>> map = initUpdCntrsPerNode.get(node.id());
  
                          if (map != null) {
 -                            partCntr = map.get(partId);
 +                            partCntrs = map.get(partId);
  
                              break;
                          }
                      }
                  }
                  else if (initUpdCntrs != null)
 -                    partCntr = initUpdCntrs.get(partId);
 +                    partCntrs = initUpdCntrs.get(partId);
              }
  
-             rec = new PartitionRecovery(ctx.log(getClass()), initTopVer0, partCntrs != null ? partCntrs.get2() : null);
 -            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0, partCntr);
++            rec = new PartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), initTopVer0, partCntrs != null ? partCntrs.get2() : null);
  
              PartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/1748e226/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index ba2ab3c,af87dfe..b4d58df
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1502,20 -1490,19 +1502,20 @@@ public class IgniteTxHandler 
                                          /*expiryPlc*/null,
                                          /*keepBinary*/true);
  
 -                                    if (val == null)
 -                                        val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
 +                                        if (val == null)
 +                                            val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
  
 -                                    if (val != null)
 -                                        entry.readValue(val);
 +                                        if (val != null)
 +                                            entry.readValue(val);
  
-                                         break;
-                                     }
-                                     catch (GridCacheEntryRemovedException ignore) {
-                                         if (log.isDebugEnabled())
-                                             log.debug("Got entry removed exception, will retry: " + entry.txKey());
+                                     break;
+                                 }
+                                 catch (GridCacheEntryRemovedException ignored) {
+                                     if (log.isDebugEnabled())
+                                         log.debug("Got entry removed exception, will retry: " + entry.txKey());
  
 -                                    entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
 +                                        entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
 +                                    }
                                  }
                              }
                          }


Mime
View raw message