ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [11/50] [abbrv] ignite git commit: Merge with master - WIP.
Date Thu, 29 Dec 2016 09:37:15 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index bbd2db0,14ce1f9..79358e8
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@@ -211,47 -208,34 +216,55 @@@ public interface GridDhtPartitionTopolo
       * @param exchId Exchange ID.
       * @param partMap Update partition map.
       * @param cntrMap Partition update counters.
-      * @return Local partition map if there were evictions or {@code null} otherwise.
+      * @return {@code True} if topology state changed.
       */
-     public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionFullMap partMap,
 -        @Nullable Map<Integer, Long> cntrMap);
 +        @Nullable Map<Integer, T2<Long, Long>> cntrMap);
  
      /**
       * @param exchId Exchange ID.
       * @param parts Partitions.
       * @param cntrMap Partition update counters.
-      * @return Local partition map if there were evictions or {@code null} otherwise.
+      * @param checkEvictions Check evictions flag.
+      * @return {@code True} if topology state changed.
       */
-     @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionMap2 parts,
-         @Nullable Map<Integer, T2<Long, Long>> cntrMap);
 -        @Nullable Map<Integer, Long> cntrMap,
++        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+         boolean checkEvictions);
  
      /**
 +     * Checks if there is at least one owner for each partition in the cache topology.
 +     * If not, marks such a partition as LOST.
 +     * <p>
 +     * This method should be called on topology coordinator after all partition messages are received.
 +     *
 +     * @param discoEvt Discovery event for which we detect lost partitions.
 +     * @return {@code True} if partitons state got updated.
 +     */
 +    public boolean detectLostPartitions(DiscoveryEvent discoEvt);
 +
 +    /**
 +     * Resets the state of all LOST partitions to OWNING.
 +     */
 +    public void resetLostPartitions();
 +
 +    /**
 +     * @return Collection of lost partitions, if any.
 +     */
 +    public Collection<Integer> lostPartitions();
 +
 +    /**
+      *
+      */
+     public void checkEvictions();
+ 
+     /**
+      * @param skipZeros If {@code true} then filters out zero counters.
       * @return Partition update counters.
       */
-     public Map<Integer, T2<Long, Long>> updateCounters();
+     public Map<Integer, Long> updateCounters(boolean skipZeros);
  
      /**
       * @param part Partition to own.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 05c5c2d,1b4dcc9..5bda0bf
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -30,18 -30,14 +30,19 @@@ import java.util.Set
  import java.util.UUID;
  import java.util.concurrent.atomic.AtomicReferenceArray;
  import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteException;
  import org.apache.ignite.IgniteLogger;
  import org.apache.ignite.IgniteSystemProperties;
 +import org.apache.ignite.cache.PartitionLossPolicy;
  import org.apache.ignite.cluster.ClusterNode;
  import org.apache.ignite.events.DiscoveryEvent;
 +import org.apache.ignite.events.EventType;
  import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
  import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
 +import org.apache.ignite.internal.processors.cache.CacheState;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
@@@ -593,12 -539,8 +591,10 @@@ import static org.apache.ignite.interna
  
      /** {@inheritDoc} */
      @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
 +        treatAllPartitionAsLocal = false;
 +
          boolean changed = waitForRent();
  
-         ClusterNode loc = cctx.localNode();
- 
          int num = cctx.affinity().partitions();
  
          AffinityTopologyVersion topVer = exchFut.topologyVersion();
@@@ -676,10 -618,10 +672,10 @@@
                      if (locPart != null) {
                          GridDhtPartitionState state = locPart.state();
  
 -                        if (state == MOVING) {
 +                        if (state == MOVING && cctx.shared().cache().globalState() == CacheState.ACTIVE) {
                              locPart.rent(false);
  
-                             updateLocal(p, loc.id(), locPart.state(), updateSeq);
+                             updateSeq = updateLocal(p, locPart.state(), updateSeq);
  
                              changed = true;
  
@@@ -1067,9 -979,9 +1066,9 @@@
  
      /** {@inheritDoc} */
      @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-     @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionFullMap partMap,
 -        @Nullable Map<Integer, Long> cntrMap) {
 +        @Nullable Map<Integer, T2<Long, Long>> cntrMap) {
          if (log.isDebugEnabled())
              log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
  
@@@ -1079,13 -991,13 +1078,13 @@@
  
          try {
              if (stopping)
-                 return null;
+                 return false;
  
              if (cntrMap != null) {
 -                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
 -                    Long cntr = this.cntrMap.get(e.getKey());
 +                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
 +                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
  
 -                    if (cntr == null || cntr < e.getValue())
 +                    if (cntr == null || cntr.get2() < e.getValue().get2())
                          this.cntrMap.put(e.getKey(), e.getValue());
                  }
  
@@@ -1240,10 -1104,7 +1239,10 @@@
              if (log.isDebugEnabled())
                  log.debug("Partition map after full update: " + fullMapString());
  
 +            if (changed)
 +                cctx.shared().exchange().scheduleResendPartitions();
 +
-             return changed ? localPartitionMap() : null;
+             return changed;
          }
          finally {
              lock.writeLock().unlock();
@@@ -1251,10 -1112,10 +1250,10 @@@
      }
  
      /** {@inheritDoc} */
-     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-     @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionMap2 parts,
-         @Nullable Map<Integer, T2<Long, Long>> cntrMap) {
 -        @Nullable Map<Integer, Long> cntrMap,
++        @Nullable Map<Integer, T2<Long, Long>> cntrMap,
+         boolean checkEvictions) {
          if (log.isDebugEnabled())
              log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
  
@@@ -1270,26 -1131,21 +1269,26 @@@
  
          try {
              if (stopping)
-                 return null;
+                 return false;
  
              if (cntrMap != null) {
 -                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
 -                    Integer p = e.getKey();
 -
 -                    Long cntr = this.cntrMap.get(p);
 +                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
 +                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
  
 -                    if (cntr == null || cntr < e.getValue())
 -                        this.cntrMap.put(p, e.getValue());
 +                    if (cntr == null || cntr.get2() < e.getValue().get2())
 +                        this.cntrMap.put(e.getKey(), e.getValue());
 +                }
  
-                 for (int i = 0; i < locParts.length(); i++) {
-                     GridDhtLocalPartition part = locParts.get(i);
++                for (int p = 0; p < locParts.length(); p++) {
+                     GridDhtLocalPartition part = locParts.get(p);
  
 -                    if (part != null)
 -                        part.updateCounter(e.getValue());
 +                    if (part == null)
 +                        continue;
 +
 +                    T2<Long, Long> cntr = cntrMap.get(part.id());
 +
 +                    if (cntr != null && cntr.get2() > part.updateCounter())
 +                        part.updateCounter(cntr.get2());
                  }
              }
  
@@@ -1320,58 -1176,86 +1319,97 @@@
  
              long updateSeq = this.updateSeq.incrementAndGet();
  
-             node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
+             node2part.newUpdateSequence(updateSeq);
  
-             boolean changed = false;
+             boolean changed = cur == null || !cur.equals(parts);
  
-             if (cur == null || !cur.equals(parts))
-                 changed = true;
+             if (changed) {
+                 node2part.put(parts.nodeId(), parts);
  
-             node2part.put(parts.nodeId(), parts);
+                 // Add new mappings.
+                 for (Integer p : parts.keySet()) {
+                     Set<UUID> ids = part2node.get(p);
  
-             part2node = new HashMap<>(part2node);
+                     if (ids == null)
+                         // Initialize HashSet to size 3 in anticipation that there won't be
+                         // more than 3 nodes per partition.
+                         part2node.put(p, ids = U.newHashSet(3));
  
-             // Add new mappings.
-             for (Integer p : parts.keySet()) {
-                 Set<UUID> ids = part2node.get(p);
+                     ids.add(parts.nodeId());
+                 }
  
-                 if (ids == null)
-                     // Initialize HashSet to size 3 in anticipation that there won't be
-                     // more than 3 nodes per partition.
-                     part2node.put(p, ids = U.newHashSet(3));
+                 // Remove obsolete mappings.
+                 if (cur != null) {
+                     for (Integer p : cur.keySet()) {
+                         if (parts.containsKey(p))
+                             continue;
  
-                 changed |= ids.add(parts.nodeId());
-             }
+                         Set<UUID> ids = part2node.get(p);
  
-             // Remove obsolete mappings.
-             if (cur != null) {
-                 for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                     Set<UUID> ids = part2node.get(p);
- 
-                     if (ids != null)
-                         changed |= ids.remove(parts.nodeId());
+                         if (ids != null)
+                             ids.remove(parts.nodeId());
+                     }
                  }
              }
+             else
+                 cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
  
 -            if (checkEvictions)
 -                changed |= checkEvictions(updateSeq);
 +            AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
 +
 +            if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
 +                List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
 +
-                 changed |= checkEvictions(updateSeq, aff);
++                if (checkEvictions)
++                    changed |= checkEvictions(updateSeq, aff);
 +
 +                updateRebalanceVersion(aff);
 +            }
  
              consistencyCheck();
  
              if (log.isDebugEnabled())
                  log.debug("Partition map after single update: " + fullMapString());
  
 +            if (changed)
 +                cctx.shared().exchange().scheduleResendPartitions();
 +
-             return changed ? localPartitionMap() : null;
+             return changed;
+         }
+         finally {
+             lock.writeLock().unlock();
+         }
+     }
+ 
+     /**
+      * @param updateSeq Update sequence.
+      * @return {@code True} if state changed.
+      */
+     private boolean checkEvictions(long updateSeq) {
+         AffinityTopologyVersion affVer = cctx.affinity().affinityTopologyVersion();
+ 
+         boolean changed = false;
+ 
+         if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
+             List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
+ 
+             changed = checkEvictions(updateSeq, aff);
+ 
+             updateRebalanceVersion(aff);
+         }
+ 
+         return changed;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void checkEvictions() {
+         lock.writeLock().lock();
+ 
+         try {
+             long updateSeq = this.updateSeq.incrementAndGet();
+ 
+             node2part.newUpdateSequence(updateSeq);
+ 
+             checkEvictions(updateSeq);
          }
          finally {
              lock.writeLock().unlock();
@@@ -1698,24 -1387,32 +1748,34 @@@
              }
          }
  
 -        UUID locNodeId = cctx.localNodeId();
 +        if (node2part != null) {
-             GridDhtPartitionMap2 map = node2part.get(nodeId);
++            UUID locNodeId = cctx.localNodeId();
+ 
 -        GridDhtPartitionMap2 map = node2part.get(locNodeId);
++            GridDhtPartitionMap2 map = node2part.get(locNodeId);
  
-             if (map == null)
-                 node2part.put(nodeId, map = new GridDhtPartitionMap2(nodeId, updateSeq, topVer,
-                     Collections.<Integer, GridDhtPartitionState>emptyMap(), false));
 -        if (map == null) {
 -            map = new GridDhtPartitionMap2(locNodeId,
 -                updateSeq,
 -                topVer,
 -                Collections.<Integer, GridDhtPartitionState>emptyMap(),
 -                false);
++            if (map == null) {
++                map = new GridDhtPartitionMap2(locNodeId,
++                    updateSeq,
++                    topVer,
++                    Collections.<Integer, GridDhtPartitionState>emptyMap(),
++                    false);
+ 
 -            node2part.put(locNodeId, map);
 -        }
++                node2part.put(locNodeId, map);
++            }
  
 -        map.updateSequence(updateSeq, topVer);
 +            map.updateSequence(updateSeq, topVer);
  
 -        map.put(p, state);
 +            map.put(p, state);
  
 -        Set<UUID> ids = part2node.get(p);
 +            Set<UUID> ids = part2node.get(p);
  
 -        if (ids == null)
 -            part2node.put(p, ids = U.newHashSet(3));
 +            if (ids == null)
 +                part2node.put(p, ids = U.newHashSet(3));
  
-             ids.add(nodeId);
 -        ids.add(locNodeId);
++            ids.add(locNodeId);
 +        }
+ 
+         return updateSeq;
      }
  
      /**
@@@ -1807,23 -1500,26 +1863,24 @@@
      }
  
      /** {@inheritDoc} */
-     @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
 -    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
++    @Override public Map<Integer, T2<Long, Long>> updateCounters(boolean skipZeros) {
          lock.readLock().lock();
  
          try {
-             return node2part.get(nodeId);
-         }
-         finally {
-             lock.readLock().unlock();
-         }
-     }
 -            Map<Integer, Long> res;
++            Map<Integer, T2<Long, Long>> res;
  
-     /** {@inheritDoc} */
-     @Override public Map<Integer, T2<Long, Long>> updateCounters() {
-         lock.readLock().lock();
+             if (skipZeros) {
+                 res = U.newHashMap(cntrMap.size());
  
-         try {
-             Map<Integer, T2<Long, Long>> res = new HashMap<>(cntrMap);
 -                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
 -                    Long cntr = e.getValue();
 -
 -                    if (ZERO.equals(cntr))
++                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
++                    if (ZERO.equals(e.getValue().get1()) && ZERO.equals(e.getValue().get2()))
+                         continue;
+ 
 -                    res.put(e.getKey(), cntr);
++                    res.put(e.getKey(), e.getValue());
+                 }
+             }
+             else
+                 res = new HashMap<>(cntrMap);
  
              for (int i = 0; i < locParts.length(); i++) {
                  GridDhtLocalPartition part = locParts.get(i);
@@@ -1831,11 -1527,14 +1888,14 @@@
                  if (part == null)
                      continue;
  
 -                Long cntr0 = res.get(part.id());
 -                long cntr1 = part.updateCounter();
 +                T2<Long, Long> cntr0 = res.get(part.id());
 +                Long cntr1 = part.initialUpdateCounter();
  
+                 if (skipZeros && cntr1 == 0L)
+                     continue;
+ 
 -                if (cntr0 == null || cntr1 > cntr0)
 -                    res.put(part.id(), cntr1);
 +                if (cntr0 == null || cntr1 > cntr0.get1())
 +                    res.put(part.id(), new T2<Long, Long>(cntr1, part.updateCounter()));
              }
  
              return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 8096950,8aeecf8..b04dc30
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -365,8 -361,12 +365,13 @@@ public final class GridDhtTxPrepareFutu
  
                  boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
  
-                 if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM ||
-                     tx.nearOnOriginatingNode() || tx.hasInterceptor()) {
+                 CacheObject val;
+                 CacheObject oldVal = null;
+ 
 -                boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM;
++                boolean readOld = hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM ||
++                    tx.nearOnOriginatingNode() || tx.hasInterceptor();
+ 
+                 if (readOld) {
                      cached.unswap(retVal);
  
                      boolean readThrough = !txEntry.skipStore() &&
@@@ -381,9 -381,10 +386,9 @@@
  
                      final boolean keepBinary = txEntry.keepBinary();
  
-                     CacheObject val = cached.innerGet(
+                     val = oldVal = cached.innerGet(
                          null,
                          tx,
 -                        /*swap*/true,
                          readThrough,
                          /*metrics*/retVal,
                          /*event*/evt,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f750840,b291bd2..be32767
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@@ -2779,12 -2809,10 +2819,12 @@@ public class GridDhtAtomicCache<K, V> e
       *      locks are released.
       */
      @SuppressWarnings("ForLoopReplaceableByForEach")
-     private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion topVer)
+     private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer)
          throws GridDhtInvalidPartitionException {
 +        ctx.shared().database().checkpointReadLock();
 +
-         if (keys.size() == 1) {
-             KeyCacheObject key = keys.get(0);
+         if (req.size() == 1) {
+             KeyCacheObject key = req.key(0);
  
              while (true) {
                  try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 058d4aa,efb35c4..98ab764
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@@ -23,18 -23,9 +23,14 @@@ import java.util.HashMap
  import java.util.List;
  import java.util.Map;
  import java.util.UUID;
 +import java.util.concurrent.atomic.AtomicReference;
 +import javax.cache.processor.EntryProcessor;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.IgniteSystemProperties;
  import org.apache.ignite.cache.CacheWriteSynchronizationMode;
  import org.apache.ignite.cluster.ClusterNode;
- import org.apache.ignite.internal.IgniteInternalFuture;
- import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
- import org.apache.ignite.internal.processors.cache.CacheObject;
- import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
  import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@@ -48,12 -35,9 +40,12 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.internal.util.typedef.F;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
- import org.apache.ignite.lang.IgniteUuid;
+ import org.jetbrains.annotations.NotNull;
  import org.jetbrains.annotations.Nullable;
  
 +import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_MAX_CONCURRENT_DHT_UPDATES;
 +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 +
  /**
   * DHT atomic cache backup update future.
   */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 0000000,87d9225..84c2109
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@@ -1,0 -1,1025 +1,1046 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+ 
+ import java.io.Externalizable;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.List;
+ import java.util.UUID;
+ import javax.cache.expiry.ExpiryPolicy;
+ import javax.cache.processor.EntryProcessor;
+ import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.IgniteLogger;
+ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+ import org.apache.ignite.internal.GridDirectCollection;
+ import org.apache.ignite.internal.GridDirectTransient;
+ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+ import org.apache.ignite.internal.processors.cache.CacheObject;
+ import org.apache.ignite.internal.processors.cache.GridCacheContext;
+ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+ import org.apache.ignite.internal.util.GridLongList;
+ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+ import org.apache.ignite.internal.util.typedef.F;
+ import org.apache.ignite.internal.util.typedef.internal.CU;
+ import org.apache.ignite.internal.util.typedef.internal.S;
+ import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+ import org.apache.ignite.plugin.extensions.communication.MessageReader;
+ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+ import org.jetbrains.annotations.NotNull;
+ import org.jetbrains.annotations.Nullable;
+ 
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+ 
+ /**
+  * Lite DHT cache update request sent from near node to primary node.
+  */
+ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdateRequest {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Target node ID. */
+     @GridDirectTransient
+     private UUID nodeId;
+ 
+     /** Future version. */
+     private GridCacheVersion futVer;
+ 
+     /** Update version. Set to non-null if fastMap is {@code true}. */
+     private GridCacheVersion updateVer;
+ 
+     /** Topology version. */
+     private AffinityTopologyVersion topVer;
+ 
+     /** Write synchronization mode. */
+     private CacheWriteSynchronizationMode syncMode;
+ 
+     /** Update operation. */
+     private GridCacheOperation op;
+ 
+     /** Subject ID. */
+     protected UUID subjId;
+ 
+     /** Task name hash. */
+     protected int taskNameHash;
+ 
+     /** */
+     @GridDirectTransient
+     private GridNearAtomicUpdateResponse res;
+ 
+     /** Fast map flag. */
+     protected boolean fastMap;
+ 
+     /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+     protected boolean topLocked;
+ 
+     /** Flag indicating whether request contains primary keys. */
+     protected boolean hasPrimary;
+ 
+     /** Skip write-through to a persistent storage. */
+     protected boolean skipStore;
+ 
+     /** */
+     protected boolean clientReq;
+ 
+     /** Keep binary flag. */
+     protected boolean keepBinary;
+ 
+     /** Return value flag. */
+     protected boolean retval;
+ 
++    /** */
++    protected boolean recovery;
++
+     /** Keys to update. */
+     @GridToStringInclude
+     @GridDirectCollection(KeyCacheObject.class)
+     private List<KeyCacheObject> keys;
+ 
+     /** Values to update. */
+     @GridDirectCollection(CacheObject.class)
+     private List<CacheObject> vals;
+ 
+     /** Partitions of keys. */
+     @GridDirectCollection(int.class)
+     private List<Integer> partIds;
+ 
+     /** Entry processors. */
+     @GridDirectTransient
+     private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+ 
+     /** Entry processors bytes. */
+     @GridDirectCollection(byte[].class)
+     private List<byte[]> entryProcessorsBytes;
+ 
+     /** Conflict versions. */
+     @GridDirectCollection(GridCacheVersion.class)
+     private List<GridCacheVersion> conflictVers;
+ 
+     /** Conflict TTLs. */
+     private GridLongList conflictTtls;
+ 
+     /** Conflict expire times. */
+     private GridLongList conflictExpireTimes;
 -
+     /** Optional arguments for entry processor. */
+     @GridDirectTransient
+     private Object[] invokeArgs;
+ 
+     /** Entry processor arguments bytes. */
+     private byte[][] invokeArgsBytes;
+ 
+     /** Expiry policy. */
+     @GridDirectTransient
+     private ExpiryPolicy expiryPlc;
+ 
+     /** Expiry policy bytes. */
+     private byte[] expiryPlcBytes;
+ 
+     /** Filter. */
+     private CacheEntryPredicate[] filter;
+ 
+     /** Maximum possible size of inner collections. */
+     @GridDirectTransient
+     private int initSize;
+ 
+     /**
+      * Empty constructor required by {@link Externalizable}.
+      */
+     public GridNearAtomicFullUpdateRequest() {
+         // No-op.
+     }
+ 
+     /**
+      * Constructor.
+      *
+      * @param cacheId Cache ID.
+      * @param nodeId Node ID.
+      * @param futVer Future version.
+      * @param fastMap Fast map scheme flag.
+      * @param updateVer Update version set if fast map is performed.
+      * @param topVer Topology version.
+      * @param topLocked Topology locked flag.
+      * @param syncMode Synchronization mode.
+      * @param op Cache update operation.
+      * @param retval Return value required flag.
+      * @param expiryPlc Expiry policy.
+      * @param invokeArgs Optional arguments for entry processor.
+      * @param filter Optional filter for atomic check.
+      * @param subjId Subject ID.
+      * @param taskNameHash Task name hash code.
+      * @param skipStore Skip write-through to a persistent storage.
+      * @param keepBinary Keep binary flag.
+      * @param clientReq Client node request flag.
+      * @param addDepInfo Deployment info flag.
+      * @param maxEntryCnt Maximum entries count.
+      */
+     GridNearAtomicFullUpdateRequest(
+         int cacheId,
+         UUID nodeId,
+         GridCacheVersion futVer,
+         boolean fastMap,
+         @Nullable GridCacheVersion updateVer,
+         @NotNull AffinityTopologyVersion topVer,
+         boolean topLocked,
+         CacheWriteSynchronizationMode syncMode,
+         GridCacheOperation op,
+         boolean retval,
+         @Nullable ExpiryPolicy expiryPlc,
+         @Nullable Object[] invokeArgs,
+         @Nullable CacheEntryPredicate[] filter,
+         @Nullable UUID subjId,
+         int taskNameHash,
+         boolean skipStore,
+         boolean keepBinary,
++        boolean recovery,
+         boolean clientReq,
+         boolean addDepInfo,
+         int maxEntryCnt
+     ) {
+         assert futVer != null;
+ 
+         this.cacheId = cacheId;
+         this.nodeId = nodeId;
+         this.futVer = futVer;
+         this.fastMap = fastMap;
+         this.updateVer = updateVer;
+ 
+         this.topVer = topVer;
+         this.topLocked = topLocked;
+         this.syncMode = syncMode;
+         this.op = op;
+         this.retval = retval;
+         this.expiryPlc = expiryPlc;
+         this.invokeArgs = invokeArgs;
+         this.filter = filter;
+         this.subjId = subjId;
+         this.taskNameHash = taskNameHash;
+         this.skipStore = skipStore;
+         this.keepBinary = keepBinary;
++        this.recovery = recovery;
+         this.clientReq = clientReq;
+         this.addDepInfo = addDepInfo;
+ 
+         // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+         // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+         // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+         // than 10, we use it.
+         initSize = Math.min(maxEntryCnt, 10);
+ 
+         keys = new ArrayList<>(initSize);
+ 
+         partIds = new ArrayList<>(initSize);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int lookupIndex() {
+         return CACHE_MSG_IDX;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public UUID nodeId() {
+         return nodeId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void nodeId(UUID nodeId) {
+         this.nodeId = nodeId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public UUID subjectId() {
+         return subjId;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int taskNameHash() {
+         return taskNameHash;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheVersion futureVersion() {
+         return futVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheVersion updateVersion() {
+         return updateVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public AffinityTopologyVersion topologyVersion() {
+         return topVer;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+         return syncMode;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCacheOperation operation() {
+         return op;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+         if (this.res == null) {
+             this.res = res;
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public GridNearAtomicUpdateResponse response() {
+         return res;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean addDeploymentInfo() {
+         return addDepInfo;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+         return ctx.atomicMessageLogger();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void addUpdateEntry(KeyCacheObject key,
+         @Nullable Object val,
+         long conflictTtl,
+         long conflictExpireTime,
+         @Nullable GridCacheVersion conflictVer,
+         boolean primary) {
+         EntryProcessor<Object, Object, Object> entryProcessor = null;
+ 
+         if (op == TRANSFORM) {
+             assert val instanceof EntryProcessor : val;
+ 
+             entryProcessor = (EntryProcessor<Object, Object, Object>)val;
+         }
+ 
+         assert val != null || op == DELETE;
+ 
+         keys.add(key);
+         partIds.add(key.partition());
+ 
+         if (entryProcessor != null) {
+             if (entryProcessors == null)
+                 entryProcessors = new ArrayList<>(initSize);
+ 
+             entryProcessors.add(entryProcessor);
+         }
+         else if (val != null) {
+             assert val instanceof CacheObject : val;
+ 
+             if (vals == null)
+                 vals = new ArrayList<>(initSize);
+ 
+             vals.add((CacheObject)val);
+         }
+ 
+         hasPrimary |= primary;
+ 
+         // In case there is no conflict, do not create the list.
+         if (conflictVer != null) {
+             if (conflictVers == null) {
+                 conflictVers = new ArrayList<>(initSize);
+ 
+                 for (int i = 0; i < keys.size() - 1; i++)
+                     conflictVers.add(null);
+             }
+ 
+             conflictVers.add(conflictVer);
+         }
+         else if (conflictVers != null)
+             conflictVers.add(null);
+ 
+         if (conflictTtl >= 0) {
+             if (conflictTtls == null) {
+                 conflictTtls = new GridLongList(keys.size());
+ 
+                 for (int i = 0; i < keys.size() - 1; i++)
+                     conflictTtls.add(CU.TTL_NOT_CHANGED);
+             }
+ 
+             conflictTtls.add(conflictTtl);
+         }
+ 
+         if (conflictExpireTime >= 0) {
+             if (conflictExpireTimes == null) {
+                 conflictExpireTimes = new GridLongList(keys.size());
+ 
+                 for (int i = 0; i < keys.size() - 1; i++)
+                     conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+             }
+ 
+             conflictExpireTimes.add(conflictExpireTime);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public List<KeyCacheObject> keys() {
+         return keys;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int size() {
+         return keys != null ? keys.size() : 0;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public KeyCacheObject key(int idx) {
+         return keys.get(idx);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public List<?> values() {
+         return op == TRANSFORM ? entryProcessors : vals;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public CacheObject value(int idx) {
+         assert op == UPDATE : op;
+ 
+         return vals.get(idx);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+         assert op == TRANSFORM : op;
+ 
+         return entryProcessors.get(idx);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public CacheObject writeValue(int idx) {
+         if (vals != null)
+             return vals.get(idx);
+ 
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public List<GridCacheVersion> conflictVersions() {
+         return conflictVers;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+         if (conflictVers != null) {
+             assert idx >= 0 && idx < conflictVers.size();
+ 
+             return conflictVers.get(idx);
+         }
+ 
+         return null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long conflictTtl(int idx) {
+         if (conflictTtls != null) {
+             assert idx >= 0 && idx < conflictTtls.size();
+ 
+             return conflictTtls.get(idx);
+         }
+ 
+         return CU.TTL_NOT_CHANGED;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long conflictExpireTime(int idx) {
+         if (conflictExpireTimes != null) {
+             assert idx >= 0 && idx < conflictExpireTimes.size();
+ 
+             return conflictExpireTimes.get(idx);
+         }
+ 
+         return CU.EXPIRE_TIME_CALCULATE;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public Object[] invokeArguments() {
+         return invokeArgs;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean fastMap() {
+         return fastMap;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean topologyLocked() {
+         return topLocked;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean clientRequest() {
+         return clientReq;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean returnValue() {
+         return retval;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean skipStore() {
+         return skipStore;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean keepBinary() {
+         return keepBinary;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean hasPrimary() {
+         return hasPrimary;
+     }
+ 
+     /** {@inheritDoc} */
++    @Override public boolean recovery() {
++        return recovery;
++    }
++
++    /** {@inheritDoc} */
+     @Override @Nullable public CacheEntryPredicate[] filter() {
+         return filter;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public ExpiryPolicy expiry() {
+         return expiryPlc;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+         super.prepareMarshal(ctx);
+ 
+         GridCacheContext cctx = ctx.cacheContext(cacheId);
+ 
+         if (expiryPlc != null && expiryPlcBytes == null)
+             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ 
+         prepareMarshalCacheObjects(keys, cctx);
+ 
+         if (filter != null) {
+             boolean hasFilter = false;
+ 
+             for (CacheEntryPredicate p : filter) {
+                 if (p != null) {
+                     hasFilter = true;
+ 
+                     p.prepareMarshal(cctx);
+                 }
+             }
+ 
+             if (!hasFilter)
+                 filter = null;
+         }
+ 
+         if (op == TRANSFORM) {
+             // force addition of deployment info for entry processors if P2P is enabled globally.
+             if (!addDepInfo && ctx.deploymentEnabled())
+                 addDepInfo = true;
+ 
+             if (entryProcessorsBytes == null)
+                 entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ 
+             if (invokeArgsBytes == null)
+                 invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+         }
+         else
+             prepareMarshalCacheObjects(vals, cctx);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+         super.finishUnmarshal(ctx, ldr);
+ 
+         GridCacheContext cctx = ctx.cacheContext(cacheId);
+ 
+         if (expiryPlcBytes != null && expiryPlc == null)
+             expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ 
+         finishUnmarshalCacheObjects(keys, cctx, ldr);
+ 
+         if (filter != null) {
+             for (CacheEntryPredicate p : filter) {
+                 if (p != null)
+                     p.finishUnmarshal(cctx, ldr);
+             }
+         }
+ 
+         if (op == TRANSFORM) {
+             if (entryProcessors == null)
+                 entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ 
+             if (invokeArgs == null)
+                 invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+         }
+         else
+             finishUnmarshalCacheObjects(vals, cctx, ldr);
+ 
+         if (partIds != null && !partIds.isEmpty()) {
+             assert partIds.size() == keys.size();
+ 
+             for (int i = 0; i < keys.size(); i++)
+                 keys.get(i).partition(partIds.get(i));
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int partition() {
+         return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+         writer.setBuffer(buf);
+ 
+         if (!super.writeTo(buf, writer))
+             return false;
+ 
+         if (!writer.isHeaderWritten()) {
+             if (!writer.writeHeader(directType(), fieldsCount()))
+                 return false;
+ 
+             writer.onHeaderWritten();
+         }
+ 
+         switch (writer.state()) {
+             case 3:
+                 if (!writer.writeBoolean("clientReq", clientReq))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 4:
+                 if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 5:
+                 if (!writer.writeMessage("conflictTtls", conflictTtls))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 6:
+                 if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 7:
+                 if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 8:
+                 if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 9:
+                 if (!writer.writeBoolean("fastMap", fastMap))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 10:
+                 if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 11:
+                 if (!writer.writeMessage("futVer", futVer))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 12:
+                 if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 13:
+                 if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 14:
+                 if (!writer.writeBoolean("keepBinary", keepBinary))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 15:
+                 if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 16:
+                 if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 17:
+                 if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 18:
 -                if (!writer.writeBoolean("retval", retval))
++                if (!writer.writeBoolean("recovery", recovery))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 19:
 -                if (!writer.writeBoolean("skipStore", skipStore))
++                if (!writer.writeBoolean("retval", retval))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 20:
 -                if (!writer.writeUuid("subjId", subjId))
++                if (!writer.writeBoolean("skipStore", skipStore))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 21:
 -                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
++                if (!writer.writeUuid("subjId", subjId))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 22:
 -                if (!writer.writeInt("taskNameHash", taskNameHash))
++                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 23:
 -                if (!writer.writeBoolean("topLocked", topLocked))
++                if (!writer.writeInt("taskNameHash", taskNameHash))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 24:
 -                if (!writer.writeMessage("topVer", topVer))
++                if (!writer.writeBoolean("topLocked", topLocked))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 25:
 -                if (!writer.writeMessage("updateVer", updateVer))
++                if (!writer.writeMessage("topVer", topVer))
+                     return false;
+ 
+                 writer.incrementState();
+ 
+             case 26:
 -                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
++                if (!writer.writeMessage("updateVer", updateVer))
+                     return false;
+ 
+                 writer.incrementState();
+ 
++            case 27:
++                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
++                    return false;
++
++                writer.incrementState();
+         }
+ 
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+         reader.setBuffer(buf);
+ 
+         if (!reader.beforeMessageRead())
+             return false;
+ 
+         if (!super.readFrom(buf, reader))
+             return false;
+ 
+         switch (reader.state()) {
+             case 3:
+                 clientReq = reader.readBoolean("clientReq");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 4:
+                 conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 5:
+                 conflictTtls = reader.readMessage("conflictTtls");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 6:
+                 conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 7:
+                 entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 8:
+                 expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 9:
+                 fastMap = reader.readBoolean("fastMap");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 10:
+                 filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 11:
+                 futVer = reader.readMessage("futVer");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 12:
+                 hasPrimary = reader.readBoolean("hasPrimary");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 13:
+                 invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 14:
+                 keepBinary = reader.readBoolean("keepBinary");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 15:
+                 keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 16:
+                 byte opOrd;
+ 
+                 opOrd = reader.readByte("op");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 op = GridCacheOperation.fromOrdinal(opOrd);
+ 
+                 reader.incrementState();
+ 
+             case 17:
+                 partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 18:
 -                retval = reader.readBoolean("retval");
++                recovery = reader.readBoolean("recovery");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 19:
 -                skipStore = reader.readBoolean("skipStore");
++                retval = reader.readBoolean("retval");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 20:
 -                subjId = reader.readUuid("subjId");
++                skipStore = reader.readBoolean("skipStore");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
+             case 21:
++                subjId = reader.readUuid("subjId");
++
++                if (!reader.isLastRead())
++                    return false;
++
++                reader.incrementState();
++
++            case 22:
+                 byte syncModeOrd;
+ 
+                 syncModeOrd = reader.readByte("syncMode");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+ 
+                 reader.incrementState();
+ 
 -            case 22:
++            case 23:
+                 taskNameHash = reader.readInt("taskNameHash");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
 -            case 23:
++            case 24:
+                 topLocked = reader.readBoolean("topLocked");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
 -            case 24:
++            case 25:
+                 topVer = reader.readMessage("topVer");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
 -            case 25:
++            case 26:
+                 updateVer = reader.readMessage("updateVer");
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
+ 
 -            case 26:
++            case 27:
+                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+ 
+                 if (!reader.isLastRead())
+                     return false;
+ 
+                 reader.incrementState();
 -
+         }
+ 
+         return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void cleanup(boolean clearKeys) {
+         vals = null;
+         entryProcessors = null;
+         entryProcessorsBytes = null;
+         invokeArgs = null;
+         invokeArgsBytes = null;
+ 
+         if (clearKeys)
+             keys = null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte directType() {
+         return 40;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public byte fieldsCount() {
 -        return 27;
++        return 28;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter),
+             "parent", super.toString());
+     }
+ }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 4860eb4,bd231cf..fd2479c
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@@ -561,28 -563,94 +565,97 @@@ public class GridNearAtomicSingleUpdate
              throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                  "left the grid).");
  
-         GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
-             cctx.cacheId(),
-             primary.id(),
-             futVer,
-             false,
-             updVer,
-             topVer,
-             topLocked,
-             syncMode,
-             op,
-             retval,
-             expiryPlc,
-             invokeArgs,
-             filter,
-             subjId,
-             taskNameHash,
-             skipStore,
-             keepBinary,
-             recovery,
-             cctx.kernalContext().clientNode(),
-             cctx.deploymentEnabled(),
-             1);
+         GridNearAtomicAbstractUpdateRequest req;
+ 
+         if (canUseSingleRequest(primary)) {
+             if (op == TRANSFORM) {
+                 req = new GridNearAtomicSingleUpdateInvokeRequest(
+                     cctx.cacheId(),
+                     primary.id(),
+                     futVer,
+                     false,
+                     updVer,
+                     topVer,
+                     topLocked,
+                     syncMode,
+                     op,
+                     retval,
+                     invokeArgs,
+                     subjId,
+                     taskNameHash,
+                     skipStore,
+                     keepBinary,
+                     cctx.kernalContext().clientNode(),
+                     cctx.deploymentEnabled());
+             }
+             else {
+                 if (filter == null || filter.length == 0) {
+                     req = new GridNearAtomicSingleUpdateRequest(
+                         cctx.cacheId(),
+                         primary.id(),
+                         futVer,
+                         false,
+                         updVer,
+                         topVer,
+                         topLocked,
+                         syncMode,
+                         op,
+                         retval,
+                         subjId,
+                         taskNameHash,
+                         skipStore,
+                         keepBinary,
++                        recovery,
+                         cctx.kernalContext().clientNode(),
+                         cctx.deploymentEnabled());
+                 }
+                 else {
+                     req = new GridNearAtomicSingleUpdateFilterRequest(
+                         cctx.cacheId(),
+                         primary.id(),
+                         futVer,
+                         false,
+                         updVer,
+                         topVer,
+                         topLocked,
+                         syncMode,
+                         op,
+                         retval,
+                         filter,
+                         subjId,
+                         taskNameHash,
+                         skipStore,
+                         keepBinary,
++                        recovery,
+                         cctx.kernalContext().clientNode(),
+                         cctx.deploymentEnabled());
+                 }
+             }
+         }
+         else {
+             req = new GridNearAtomicFullUpdateRequest(
+                 cctx.cacheId(),
+                 primary.id(),
+                 futVer,
+                 false,
+                 updVer,
+                 topVer,
+                 topLocked,
+                 syncMode,
+                 op,
+                 retval,
+                 expiryPlc,
+                 invokeArgs,
+                 filter,
+                 subjId,
+                 taskNameHash,
+                 skipStore,
+                 keepBinary,
++                recovery,
+                 cctx.kernalContext().clientNode(),
+                 cctx.deploymentEnabled(),
+                 1);
+         }
  
          req.addUpdateEntry(cacheKey,
              val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 345e0d1,6e69161..f1e2c01
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@@ -19,11 -19,10 +19,12 @@@ package org.apache.ignite.internal.proc
  
  import java.io.Externalizable;
  import java.nio.ByteBuffer;
 +import java.util.Map;
  import org.apache.ignite.internal.processors.cache.GridCacheMessage;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 +import org.apache.ignite.internal.util.typedef.T2;
  import org.apache.ignite.internal.util.typedef.internal.S;
+ import org.apache.ignite.lang.IgniteProductVersion;
  import org.apache.ignite.plugin.extensions.communication.MessageReader;
  import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 6282d03,4f34401..d2b893f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@@ -1101,8 -984,10 +1068,8 @@@ public class GridDhtPartitionsExchangeF
       * @throws IgniteCheckedException If failed.
       */
      private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException {
-         GridDhtPartitionsFullMessage m = createPartitionsMessage();
+         GridDhtPartitionsFullMessage m = createPartitionsMessage(nodes, true);
  
 -        assert !nodes.contains(cctx.localNode());
 -
          if (log.isDebugEnabled())
              log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
                  ", exchId=" + exchId + ", msg=" + m + ']');
@@@ -1553,32 -1265,15 +1557,37 @@@
              if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
                  for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                      if (!cacheCtx.isLocal())
 -                        cacheCtx.topology().beforeExchange(GridDhtPartitionsExchangeFuture.this, !centralizedAff);
 +                        cacheCtx.topology().beforeExchange(this, !centralizedAff);
 +                }
 +            }
 +
 +            if (discoEvt.type() == EVT_NODE_JOINED) {
 +                if (cctx.cache().globalState() != CacheState.INACTIVE)
 +                    assignPartitionsStates();
 +            }
 +            else if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
 +                assert discoEvt instanceof DiscoveryCustomEvent;
 +
 +                if (((DiscoveryCustomEvent)discoEvt).customMessage() instanceof DynamicCacheChangeBatch) {
 +                    DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)((DiscoveryCustomEvent)discoEvt)
 +                        .customMessage();
 +
 +                    for (DynamicCacheChangeRequest req : batch.requests()) {
 +                        if (req.resetLostPartitions())
 +                            resetLostPartitions();
 +                        else if (req.globalStateChange() && req.state() != CacheState.INACTIVE)
 +                            assignPartitionsStates();
 +                    }
                  }
              }
 +            else if (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED)
 +                detectLostPartitions();
  
+             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                 if (!cacheCtx.isLocal())
+                     cacheCtx.topology().checkEvictions();
+             }
+ 
              updateLastVersion(cctx.versions().last());
  
              cctx.versions().onExchange(lastVer.get().order());
@@@ -1944,7 -1613,9 +1953,9 @@@
  
                          if (crd0.isLocal()) {
                              if (allReceived) {
+                                 awaitSingleMapUpdates();
+ 
 -                                onAllReceived(true);
 +                                onAllReceived();
  
                                  return;
                              }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 3d91468,90d6242..5a30f95
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@@ -26,11 -29,12 +29,13 @@@ import org.apache.ignite.internal.GridD
  import org.apache.ignite.internal.GridDirectTransient;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 +import org.apache.ignite.internal.util.typedef.T2;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
  import org.apache.ignite.plugin.extensions.communication.MessageReader;
  import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  import org.jetbrains.annotations.NotNull;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 5e4b1c4,bf08f0a..0975a07
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@@ -23,13 -23,16 +23,17 @@@ import java.util.Collections
  import java.util.HashMap;
  import java.util.Map;
  import org.apache.ignite.IgniteCheckedException;
+ import org.apache.ignite.internal.GridDirectMap;
  import org.apache.ignite.internal.GridDirectTransient;
  import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
  import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 +import org.apache.ignite.internal.util.typedef.T2;
+ import org.apache.ignite.internal.util.typedef.F;
  import org.apache.ignite.internal.util.typedef.internal.S;
  import org.apache.ignite.internal.util.typedef.internal.U;
+ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
  import org.apache.ignite.plugin.extensions.communication.MessageReader;
  import org.apache.ignite.plugin.extensions.communication.MessageWriter;
  import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index deb1731,41bc2fc..cf02071
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@@ -35,10 -34,9 +34,10 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.NodeStoppingException;
  import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
  import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
- import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
  import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
 +import org.apache.ignite.internal.processors.cache.CacheState;
  import org.apache.ignite.internal.processors.cache.GridCacheContext;
  import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
  import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@@ -412,9 -412,12 +413,13 @@@ public class GridDhtPreloader extends G
      }
  
      /** {@inheritDoc} */
-     @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
-         boolean forcePreload, Collection<String> caches, int cnt, @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
-         return demander.addAssignments(assignments, forcePreload, caches, cnt, forcedRebFut);
+     @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+         boolean forceRebalance,
++        Collection<String> caches,
+         int cnt,
+         Runnable next,
+         @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
+         return demander.addAssignments(assignments, forceRebalance, cnt, next, forcedRebFut);
      }
  
      /**
@@@ -763,12 -790,23 +787,27 @@@
                          try {
                              GridDhtLocalPartition part = partsToEvict.poll();
  
 -                            if (part != null)
 +                            if (part != null) {
-                                 part.tryEvict();
+                                 try {
+                                     part.tryEvict();
+                                 }
+                                 catch (Throwable ex) {
+                                     if (cctx.kernalContext().isStopping()) {
+                                         LT.warn(log, ex, "Partition eviction failed (current node is stopping).",
+                                             false,
+                                             true);
+ 
+                                         partsToEvict.clear();
+ 
+                                         return true;
+                                     }
+                                     else
+                                         LT.error(log, ex, "Partition eviction failed, this can cause grid hang.");
+                                 }
 +
 +                                if (part.state() != EVICTED)
 +                                    partsToEvict.push(part);
 +                            }
                          }
                          finally {
                              if (!partsToEvict.isEmptyx())

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b4eeb11,d4decb4..e62bf60
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@@ -26,8 -26,10 +26,9 @@@ import java.util.ArrayDeque
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.Collections;
+ import java.util.Comparator;
  import java.util.HashMap;
  import java.util.HashSet;
 -import java.util.Iterator;
  import java.util.LinkedHashMap;
  import java.util.LinkedList;
  import java.util.List;
@@@ -52,9 -55,9 +54,10 @@@ import org.apache.ignite.events.Event
  import org.apache.ignite.internal.GridKernalContext;
  import org.apache.ignite.internal.IgniteInternalFuture;
  import org.apache.ignite.internal.IgniteKernal;
 +import org.apache.ignite.internal.NodeStoppingException;
  import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
  import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
  import org.apache.ignite.internal.processors.cache.CacheObject;
  import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
@@@ -80,8 -83,10 +83,9 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.query.GridQueryProcessor;
  import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
  import org.apache.ignite.internal.processors.task.GridInternal;
+ import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
  import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
  import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 -import org.apache.ignite.internal.util.GridEmptyIterator;
  import org.apache.ignite.internal.util.GridLeanMap;
  import org.apache.ignite.internal.util.GridSpiCloseableIteratorWrapper;
  import org.apache.ignite.internal.util.GridSpinBusyLock;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index 31ad653,bb769c9..56cf271
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@@ -155,17 -157,17 +156,17 @@@ public class GridCacheSqlQuery implemen
          assert paramsBytes != null;
  
          try {
-         final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+             final ClassLoader ldr = U.resolveClassLoader(ctx.config());
  
-         if (m instanceof BinaryMarshaller)
-             // To avoid deserializing of enum types.
-             params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
-         else
-             params = m.unmarshal(paramsBytes, ldr);
-     }
+             if (m instanceof BinaryMarshaller)
+                 // To avoid deserializing of enum types.
+                 params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+             else
+                 params = U.unmarshal(m, paramsBytes, ldr);
+         }
          catch (IgniteCheckedException e) {
 -            throw new IgniteException(e);
 -        }
 +        throw new IgniteException(e);
 +    }
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 064b725,e2fbf52..d2828e0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@@ -56,8 -56,7 +56,8 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
  import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
  import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 +import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
- import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
  import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
  import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 762bf13,2706d4d..662a905
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@@ -1410,20 -1495,19 +1504,20 @@@ public class IgniteTxHandler 
                                          /*expiryPlc*/null,
                                          /*keepBinary*/true);
  
 -                                    if (val == null)
 -                                        val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
 +                                        if (val == null)
 +                                            val = cacheCtx.toCacheObject(cacheCtx.store().load(null, entry.key()));
  
 -                                    if (val != null)
 -                                        entry.readValue(val);
 +                                        if (val != null)
 +                                            entry.readValue(val);
  
 -                                    break;
 -                                }
 -                                catch (GridCacheEntryRemovedException e) {
 -                                    if (log.isDebugEnabled())
 -                                        log.debug("Got entry removed exception, will retry: " + entry.txKey());
 +                                        break;
 +                                    }
 +                                    catch (GridCacheEntryRemovedException ignore) {
 +                                        if (log.isDebugEnabled())
 +                                            log.debug("Got entry removed exception, will retry: " + entry.txKey());
  
-                                         entry.cached(null);
 -                                    entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
++                                        entry.cached(cacheCtx.cache().entryEx(entry.key(), req.topologyVersion()));
 +                                    }
                                  }
                              }
                          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 86994b5,ba44655..95fa006
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -2458,11 -2418,11 +2449,14 @@@ public abstract class IgniteTxLocalAdap
                          }
                      }
                      else
 -                        old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet();
 +                        old = entry.rawGet();
 +
 +                    final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
 +                        entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
  
+                     final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE :
+                         entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE;
+ 
                      if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) {
                          ret.set(cacheCtx, old, false, keepBinary);
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------


Mime
View raw message