ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [12/50] [abbrv] ignite git commit: Merge with master - WIP.
Date Thu, 29 Dec 2016 09:37:16 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 4cbf50e,4d85c54..cb19875
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@@ -471,23 -483,34 +493,50 @@@ public final class IgniteSystemProperti
      @Deprecated
      public static final String IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES = "IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES";
  
+     /** */
+     public static final String IGNITE_IO_BALANCE_PERIOD = "IGNITE_IO_BALANCE_PERIOD";
+ 
+     /**
+      * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise
+      * the natural order is used.
+      * <p>
+      * @deprecated Should be removed in Apache Ignite 2.0.
+      */
+     @Deprecated
+     public static final String IGNITE_BINARY_SORT_OBJECT_FIELDS = "IGNITE_BINARY_SORT_OBJECT_FIELDS";
+ 
+     /**
+      * Whether Ignite can access unaligned memory addresses.
+      * <p>
+      * Defaults to {@code} false, meaning that unaligned access will be performed only on x86 architecture.
+      */
+     public static final String IGNITE_MEMORY_UNALIGNED_ACCESS = "IGNITE_MEMORY_UNALIGNED_ACCESS";
+ 
+     /**
+      * When unsafe memory copy if performed below this threshold, Ignite will do it on per-byte basis instead of
+      * calling to Unsafe.copyMemory().
+      * <p>
+      * Defaults to 0, meaning that threshold is disabled.
+      */
+     public static final String IGNITE_MEMORY_PER_BYTE_COPY_THRESHOLD = "IGNITE_MEMORY_PER_BYTE_COPY_THRESHOLD";
+ 
      /**
 +     * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise
 +     * the natural order is used.
 +     * <p>
 +     * @deprecated Should be removed in Apache Ignite 2.0.
 +     */
 +    @Deprecated
 +    public static final String IGNITE_BINARY_SORT_OBJECT_FIELDS = "IGNITE_BINARY_SORT_OBJECT_FIELDS";
 +
 +    /**
 +     * Whether Ignite can access unaligned memory addresses.
 +     * <p>
 +     * Defaults to {@code} false, meaning that unaligned access will be performed only on x86 architecture.
 +     */
 +    public static final String IGNITE_MEMORY_UNALIGNED_ACCESS = "IGNITE_MEMORY_UNALIGNED_ACCESS";
 +
 +    /**
       * Enforces singleton.
       */
      private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index e15c989,56fc5b4..f24a940
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@@ -329,6 -341,12 +336,9 @@@ public class CacheConfiguration<K, V> e
      /** Maximum batch size for write-behind cache store. */
      private int writeBehindBatchSize = DFLT_WRITE_BEHIND_BATCH_SIZE;
  
+     /** Maximum number of query iterators that can be stored. */
+     private int maxQryIterCnt = DFLT_MAX_QUERY_ITERATOR_CNT;
+ 
 -    /** Memory mode. */
 -    private CacheMemoryMode memMode = DFLT_MEMORY_MODE;
 -
      /** */
      private AffinityKeyMapper affMapper;
  
@@@ -454,9 -472,10 +467,10 @@@
          name = cc.getName();
          nearCfg = cc.getNearConfiguration();
          nodeFilter = cc.getNodeFilter();
 +        partitionLossPolicy = cc.getPartitionLossPolicy();
          pluginCfgs = cc.getPluginConfigurations();
          qryEntities = cc.getQueryEntities() == Collections.<QueryEntity>emptyList() ? null : cc.getQueryEntities();
+         qryDetailMetricsSz = cc.getQueryDetailMetricsSize();
          readFromBackup = cc.isReadFromBackup();
          rebalanceBatchSize = cc.getRebalanceBatchSize();
          rebalanceBatchesPrefetchCount = cc.getRebalanceBatchesPrefetchCount();
@@@ -1604,8 -1646,35 +1618,33 @@@
      }
  
      /**
+      * Gets maximum number of query iterators that can be stored. Iterators are stored to
+      * support query pagination when each page of data is sent to user's node only on demand.
+      * Increase this property if you are running and processing lots of queries in parallel.
+      * <p>
+      * Default value is {@link #DFLT_MAX_QUERY_ITERATOR_CNT}.
+      *
+      * @return Maximum number of query iterators that can be stored.
+      */
+     public int getMaxQueryIteratorsCount() {
+         return maxQryIterCnt;
+     }
+ 
+     /**
+      * Sets maximum number of query iterators that can be stored.
+      *
+      * @param maxQryIterCnt Maximum number of query iterators that can be stored.
+      * @return {@code this} for chaining.
+      */
+     public CacheConfiguration<K, V> setMaxQueryIteratorsCount(int maxQryIterCnt) {
+         this.maxQryIterCnt = maxQryIterCnt;
+ 
+         return this;
+     }
+ 
+     /**
       * Gets memory mode for cache. Memory mode helps control whether value is stored in on-heap memory,
       * off-heap memory, or swap space. Refer to {@link CacheMemoryMode} for more info.
 -     * <p>
 -     * Default value is {@link #DFLT_MEMORY_MODE}.
       *
       * @return Memory mode.
       */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 3f83972,927944f..cee71bb
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@@ -48,8 -50,10 +49,9 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.job.GridJobProcessor;
  import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
  import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
 -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
  import org.apache.ignite.internal.processors.platform.PlatformProcessor;
  import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+ import org.apache.ignite.internal.processors.pool.PoolProcessor;
  import org.apache.ignite.internal.processors.port.GridPortProcessor;
  import org.apache.ignite.internal.processors.query.GridQueryProcessor;
  import org.apache.ignite.internal.processors.resource.GridResourceProcessor;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 42084e6,a2ad1b2..6a4547d
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@@ -65,8 -67,10 +66,9 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
  import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
  import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
 -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor;
  import org.apache.ignite.internal.processors.platform.PlatformProcessor;
  import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+ import org.apache.ignite.internal.processors.pool.PoolProcessor;
  import org.apache.ignite.internal.processors.port.GridPortProcessor;
  import org.apache.ignite.internal.processors.query.GridQueryProcessor;
  import org.apache.ignite.internal.processors.resource.GridResourceProcessor;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index cbd7dd0,24ddcd2..7c1c484
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@@ -97,10 -100,7 +100,10 @@@ public enum GridTopic 
      TOPIC_TX,
  
      /** */
-     TOPIC_BACKUP,
 -    TOPIC_IO_TEST;
++    TOPIC_IO_TEST,
 +
 +    /** */
-     TOPIC_IO_TEST;
++    TOPIC_BACKUP;
  
      /** Enum values. */
      private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 29691e6,4972d1f..0d99f4c
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@@ -813,8 -835,15 +828,12 @@@ public class IgniteKernal implements Ig
  
              addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getFileSystemConfiguration())));
  
+             addHelper(HADOOP_HELPER.createIfInClassPath(ctx, false));
+ 
              startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins));
  
+             startProcessor(new PoolProcessor(ctx));
+ 
 -            // Off-heap processor has no dependencies.
 -            startProcessor(new GridOffHeapProcessor(ctx));
 -
              // Closure processor should be started before all others
              // (except for resource processor), as many components can depend on it.
              startProcessor(new GridClosureProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f8a20c3,f32a753..da24e40
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@@ -93,7 -94,10 +95,8 @@@ import org.apache.ignite.spi.discovery.
  import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
  import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi;
  import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi;
+ import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi;
  import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi;
 -import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
 -import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi;
  import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
  import org.apache.ignite.thread.IgniteThread;
  import org.apache.ignite.thread.IgniteThreadPoolExecutor;
@@@ -1627,8 -1641,11 +1640,10 @@@ public class IgnitionEx 
                  ensureMultiInstanceSupport(myCfg.getCollisionSpi());
                  ensureMultiInstanceSupport(myCfg.getFailoverSpi());
                  ensureMultiInstanceSupport(myCfg.getLoadBalancingSpi());
 -                ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
              }
  
+             validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public");
+ 
              execSvc = new IgniteThreadPoolExecutor(
                  "pub",
                  cfg.getGridName(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
index 8624c64,69de3f2..7f3f5e4
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
@@@ -250,29 -248,7 +256,29 @@@ public class BinaryEnumObjectImpl imple
  
      /** {@inheritDoc} */
      @Override public byte[] valueBytes(CacheObjectContext cacheCtx) throws IgniteCheckedException {
 -        return U.marshal(ctx.marshaller(), this);
 +        if (valBytes != null)
 +            return valBytes;
 +
-         valBytes = ctx.marshaller().marshal(this);
++        valBytes = U.marshal(ctx.marshaller(), this);
 +
 +        return valBytes;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean putValue(ByteBuffer buf) throws IgniteCheckedException {
 +        assert valBytes != null : "Value bytes must be initialized before object is stored";
 +
 +        return putValue(buf, 0, objectPutSize(valBytes.length));
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException {
 +        return CacheObjectAdapter.putValue(cacheObjectType(), buf, off, len, valBytes, 0);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException {
 +        return objectPutSize(valueBytes(ctx).length);
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java
index 4b59904,59e79fb..477ae37
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java
@@@ -60,20 -47,11 +60,19 @@@ public class BinaryFieldImpl implement
       * @param fieldName Field name.
       * @param fieldId Field ID.
       */
 -    public BinaryFieldImpl(int typeId, BinarySchemaRegistry schemas, String fieldName, int fieldId) {
 +    public BinaryFieldImpl(
 +        BinaryContext ctx,
 +        int typeId,
 +        BinarySchemaRegistry schemas,
 +        String fieldName,
 +        int fieldId
 +    ) {
 +        assert ctx != null;
          assert typeId != 0;
          assert schemas != null;
-         assert fieldName != null;
          assert fieldId != 0;
  
 +        this.ctx = ctx;
          this.typeId = typeId;
          this.schemas = schemas;
          this.fieldName = fieldName;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index e6d8781,b80f573..33c7f08
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@@ -18,17 -18,20 +18,21 @@@
  package org.apache.ignite.internal.binary;
  
  import java.math.BigDecimal;
 +import java.nio.ByteBuffer;
  import java.util.Arrays;
  import java.util.IdentityHashMap;
+ import java.util.Iterator;
+ import java.util.Map;
  import org.apache.ignite.IgniteException;
+ import org.apache.ignite.binary.BinaryArrayIdentityResolver;
+ import org.apache.ignite.binary.BinaryObject;
  import org.apache.ignite.binary.BinaryObjectBuilder;
- import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
- import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
- import org.apache.ignite.internal.util.typedef.internal.SB;
  import org.apache.ignite.binary.BinaryObjectException;
+ import org.apache.ignite.binary.BinaryIdentityResolver;
  import org.apache.ignite.binary.BinaryType;
- import org.apache.ignite.binary.BinaryObject;
+ import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
+ import org.apache.ignite.internal.util.typedef.internal.SB;
+ import org.apache.ignite.lang.IgniteUuid;
  import org.jetbrains.annotations.Nullable;
  
  /**
@@@ -75,23 -78,35 +79,44 @@@ public abstract class BinaryObjectExImp
      }
  
      /**
+      * Get offset of data begin.
+      *
+      * @return Field value.
+      */
+     public abstract int dataStartOffset();
+ 
+     /**
+      * Get offset of the footer begin.
+      *
+      * @return Field value.
+      */
+     public abstract int footerStartOffset();
+ 
+     /**
       * Get field by offset.
       *
-      * @param fieldOffset Field offset.
+      * @param order Field offset.
       * @return Field value.
       */
-     @Nullable protected abstract <F> F fieldByOrder(int fieldOffset);
+     @Nullable public abstract <F> F fieldByOrder(int order);
+ 
+     /**
+      * Create field comparer.
+      *
+      * @return Comparer.
+      */
+     public abstract BinarySerializedFieldComparator createFieldComparator();
  
      /**
 +     * Writes field value defined by the given field offset to the given byte buffer.
 +     *
 +     * @param fieldOffset Field offset.
 +     * @return Boolean flag indicating whether the field was successfully written to the buffer, {@code false}
 +     *      if there is no enough space for the field in the buffer.
 +     */
 +    protected abstract boolean writeFieldByOrder(int fieldOffset, ByteBuffer buf);
 +
 +    /**
       * @param ctx Reader context.
       * @param fieldName Field name.
       * @return Field value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index bb7bf69,7ef7bc0..e0ff2a6
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -967,8 -804,17 +804,27 @@@ public class GridIoManager extends Grid
              }
          }
  
+         if (ctx.config().getStripedPoolSize() > 0 &&
+             plc == GridIoPolicy.SYSTEM_POOL &&
+             msg.partition() != Integer.MIN_VALUE
+             ) {
+             ctx.getStripedExecutorService().execute(msg.partition(), c);
+ 
+             return;
+         }
+ 
++        if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
++            IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
++
++            if (msg0.processFromNioThread()) {
++                c.run();
++
++                return;
++            }
++        }
++
          try {
-             pool(plc).execute(c);
+             pools.poolForPolicy(plc).execute(c);
          }
          catch (RejectedExecutionException e) {
              U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index f5c46a8,b1fe910..63d84c4
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@@ -161,11 -171,56 +172,61 @@@ public class GridIoMessageFactory imple
          Message msg = null;
  
          switch (type) {
+             case -44:
+                 msg = new TcpCommunicationSpi.HandshakeMessage2();
+ 
+                 break;
+ 
+             case -43:
+                 msg = new IgniteIoTestMessage();
+ 
+                 break;
+ 
+             case -42:
+                 msg = new HadoopDirectShuffleMessage();
+ 
+                 break;
+ 
+             case -41:
+                 msg = new HadoopShuffleFinishResponse();
+ 
+                 break;
+ 
+             case -40:
+                 msg = new HadoopShuffleFinishRequest();
+ 
+                 break;
+ 
+             case -39:
+                 msg = new HadoopJobId();
+ 
+                 break;
+ 
+             case -38:
+                 msg = new HadoopShuffleAck();
+ 
+                 break;
+ 
+             case -37:
+                 msg = new HadoopShuffleMessage();
+ 
+                 break;
+ 
+             case -36:
+                 msg = new GridDhtAtomicSingleUpdateRequest();
+ 
+                 break;
+ 
+             case -27:
+                 msg = new GridDhtTxOnePhaseCommitAckRequest();
+ 
+                 break;
+ 
 +            case -27:
 +                msg = new BackupFinishedMessage();
 +
 +                break;
 +
              case -26:
                  msg = new TxLockList();
  
@@@ -757,17 -812,21 +818,33 @@@
                  break;
  
              case 125:
+                 msg = new GridNearAtomicSingleUpdateRequest();
+ 
+                 break;
+ 
+             case 126:
+                 msg = new GridNearAtomicSingleUpdateInvokeRequest();
+ 
+                 break;
+ 
+             case 127:
+                 msg = new GridNearAtomicSingleUpdateFilterRequest();
+ 
+                 break;
+ 
+             // [-3..119] [124..127] [-36..-44]- this
++            case 125:
 +                msg = new TcpCommunicationSpi.HandshakeMessage2();
 +
 +                break;
 +
 +            // [-3..119] [124-125] - this
 +            case 126:
 +                msg = new IgniteIoTestMessage();
 +
 +                break;
 +
 +            // [-3..119] [124-126] - this
              // [120..123] - DR
              // [-4..-22, -30..-35] - SQL
              default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
index 985cf99,77aaa09..969ea39
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@@ -53,27 -54,28 +53,28 @@@ public class IgniteIoTestMessage implem
      }
  
      /**
-      * @param id ID.
-      * @param req {@code True} for request.
+      * @param id Message ID.
+      * @param req Request flag.
       * @param payload Payload.
       */
 -    IgniteIoTestMessage(long id, boolean req, byte[] payload) {
 +    public IgniteIoTestMessage(long id, boolean req, byte[] payload) {
          this.id = id;
          this.req = req;
          this.payload = payload;
      }
  
      /**
-      * @return Process from NIO thread flag.
+      * @return {@code True} if message should be processed from NIO thread
+      * (otherwise message is submitted to system pool).
       */
 -    boolean processFromNioThread() {
 +    public boolean processFromNioThread() {
          return isFlag(FLAG_PROC_FROM_NIO);
      }
  
      /**
-      * @param procFromNioThread Process from NIO thread flag.
+      * @param procFromNioThread {@code True} if message should be processed from NIO thread.
       */
 -    void processFromNioThread(boolean procFromNioThread) {
 +    public void processFromNioThread(boolean procFromNioThread) {
          setFlag(procFromNioThread, FLAG_PROC_FROM_NIO);
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index c36262f,88aa4e0..4c62371
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -1942,10 -1882,9 +1957,10 @@@ public abstract class GridCacheAdapter<
          @Nullable final UUID subjId,
          final String taskName,
          final boolean deserializeBinary,
-         @Nullable IgniteCacheExpiryPolicy expiry,
+         @Nullable final IgniteCacheExpiryPolicy expiry,
          final boolean skipVals,
          final boolean keepCacheObjects,
 +        final boolean recovery,
          boolean canRemap,
          final boolean needVer
      ) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index db10484,52b779d..5f45024
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -34,15 -35,15 +35,15 @@@ import org.apache.ignite.IgniteCheckedE
  import org.apache.ignite.IgniteException;
  import org.apache.ignite.IgniteLogger;
  import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 -import org.apache.ignite.cache.CacheMemoryMode;
  import org.apache.ignite.cache.eviction.EvictableEntry;
 -import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
 -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 +import org.apache.ignite.internal.pagemem.wal.StorageException;
 +import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 +import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 +import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
- import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
  import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
  import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
  import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@@ -891,7 -1196,14 +894,7 @@@ public abstract class GridCacheMapEntr
  
              assert newVer != null : "Failed to get write version for tx: " + tx;
  
-             old = this.val;
 -            boolean internal = isInternal() || !context().userCache();
 -
 -            Map<UUID, CacheContinuousQueryListener> lsnrCol =
 -                notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
 -
 -            old = oldValPresent ? oldVal :
 -                (retval || intercept || lsnrCol != null) ?
 -                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val;
++            old = oldValPresent ? oldVal : this.val;
  
              if (intercept) {
                  val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false);
@@@ -1083,10 -1409,8 +1088,10 @@@
              Map<UUID, CacheContinuousQueryListener> lsnrCol =
                  notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
  
 -            old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != null) ?
 -                rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val;
 +            if (startVer && (retval || intercept || lsnrCol != null))
 +                unswap();
 +
-             old = val;
++            old = oldValPresent ? oldVal : val;
  
              if (intercept) {
                  entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
@@@ -3122,35 -3559,47 +3125,35 @@@
  
          if (curVer == null || curVer.equals(ver)) {
              if (val != this.val) {
--                GridCacheMvcc mvcc = mvccExtras();
++                        GridCacheMvcc mvcc = mvccExtras();
  
--                if (mvcc != null && !mvcc.isEmpty())
--                    return null;
++                        if (mvcc != null && !mvcc.isEmpty())
++                            return null;
  
--                if (newVer == null)
--                    newVer = cctx.versions().next();
++                        if (newVer == null)
++                            newVer = cctx.versions().next();
  
-                 long ttl = ttlExtras();
 -                CacheObject old = rawGetOrUnmarshalUnlocked(false);
++                        long ttl = ttlExtras();
  
-                 long expTime = CU.toExpireTime(ttl);
 -                long ttl;
 -                long expTime;
++                        long expTime = CU.toExpireTime(ttl);
  
-                 // Detach value before index update.
-                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 -                if (loadExpiryPlc != null) {
 -                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(loadExpiryPlc);
++                        // Detach value before index update.
++                        val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
  
-                 if (val != null) {
-                     storeValue(val, expTime, newVer);
 -                    ttl = initTtlAndExpireTime.get1();
 -                    expTime = initTtlAndExpireTime.get2();
 -                }
 -                else {
 -                    ttl = ttlExtras();
 -                    expTime = expireTimeExtras();
 -                }
++                        if (val != null) {
++                            storeValue(val, expTime, newVer);
  
-                     if (deletedUnlocked())
-                         deletedUnlocked(false);
-                 }
 -                // Detach value before index update.
 -                val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
++                            if (deletedUnlocked())
++                                deletedUnlocked(false);
++                        }
  
-                 // Version does not change for load ops.
-                 update(val, expTime, ttl, newVer, true);
 -                if (val != null) {
 -                    updateIndex(val, expTime, newVer, old);
++                        // Version does not change for load ops.
++                        update(val, expTime, ttl, newVer, true);
  
-                 return newVer;
-             }
-         }
 -                    if (deletedUnlocked())
 -                        deletedUnlocked(false);
++                        return newVer;
++                    }
+                 }
  
 -                // Version does not change for load ops.
 -                update(val, expTime, ttl, newVer, true);
 -
 -                return newVer;
 -            }
 -        }
 -
          return null;
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index d03a90a,8ea2169..40035c0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@@ -54,16 -56,18 +56,18 @@@ import org.apache.ignite.internal.Ignit
  import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
  import org.apache.ignite.internal.events.DiscoveryCustomEvent;
  import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 +import org.apache.ignite.internal.pagemem.backup.StartFullBackupAckDiscoveryMessage;
  import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
  import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
  import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
@@@ -751,9 -750,17 +769,15 @@@ public class GridCachePartitionExchange
          if (log.isDebugEnabled())
              log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
  
 -        Collection<ClusterNode> rmts;
 -
          // If this is the oldest node.
          if (oldest.id().equals(cctx.localNodeId())) {
+             GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut;
+ 
+             // No need to send to nodes which did not finish their first exchange.
+             AffinityTopologyVersion rmtTopVer =
+                 lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE;
+ 
 -            rmts = CU.remoteNodes(cctx, rmtTopVer);
 +            Collection<ClusterNode> rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
  
              if (log.isDebugEnabled())
                  log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
@@@ -813,24 -935,11 +964,25 @@@
       * @param id ID.
       */
      private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) {
-         GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+         GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node,
+             id,
              cctx.kernalContext().clientNode(),
-             cctx.versions().last());
+             false);
  
 +        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
 +            if (!cacheCtx.isLocal()) {
 +                GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap();
 +
 +                m.addLocalPartitionMap(cacheCtx.cacheId(), locMap);
 +            }
 +        }
 +
 +        for (GridClientPartitionTopology top : clientTops.values()) {
 +            GridDhtPartitionMap2 locMap = top.localPartitionMap();
 +
 +            m.addLocalPartitionMap(top.cacheId(), locMap);
 +        }
 +
          if (log.isDebugEnabled())
              log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']');
  
@@@ -1395,17 -1594,13 +1644,13 @@@
          @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
              long timeout = cctx.gridConfig().getNetworkTimeout();
  
-             boolean startEvtFired = false;
- 
              int cnt = 0;
  
-             IgniteInternalFuture asyncStartFut = null;
- 
              while (!isCancelled()) {
 -                GridDhtPartitionsExchangeFuture exchFut = null;
 -
                  cnt++;
  
 +                GridDhtPartitionsExchangeFuture exchFut = null;
 +
                  try {
                      boolean preloadFinished = true;
  
@@@ -1580,27 -1762,26 +1813,34 @@@
                              for (Integer cacheId : orderMap.get(order)) {
                                  GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
  
+                                 GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+ 
+                                 if (assigns != null)
+                                     assignsCancelled |= assigns.cancelled();
+ 
 +                                List<String> waitList = new ArrayList<>(size - 1);
 +
 +                                for (List<Integer> cIds : orderMap.headMap(order).values()) {
 +                                    for (Integer cId : cIds)
 +                                        waitList.add(cctx.cacheContext(cId).name());
 +                                }
 +
-                                 Callable<Boolean> r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId),
+                                 // Cancels previous rebalance future (in case it's not done yet).
+                                 // Sends previous rebalance stopped event (if necessary).
+                                 // Creates new rebalance future.
+                                 // Sends current rebalance started event (if necessary).
+                                 // Finishes cache sync future (on empty assignments).
+                                 Runnable cur = cacheCtx.preloader().addAssignments(assigns,
                                      forcePreload,
 +                                    waitList,
                                      cnt,
+                                     r,
                                      exchFut.forcedRebalanceFuture());
  
-                                 if (r != null) {
-                                     U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() +
-                                         ", waitList=" + waitList.toString() + "]");
+                                 if (cur != null) {
+                                     rebList.add(U.maskName(cacheCtx.name()));
  
-                                     if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME))
-                                         marshR = r;
-                                     else
-                                         orderedRs.add(r);
+                                     r = cur;
                                  }
                              }
                          }

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1ea2e12,0c28691..694e20f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@@ -84,14 -84,14 +84,15 @@@ public interface GridCachePreloader 
       *
       * @param assignments Assignments to add.
       * @param forcePreload Force preload flag.
-      * @param caches Rebalancing of these caches will be finished before this started.
       * @param cnt Counter.
-      * @return Rebalancing closure.
+      * @param next Runnable responsible for cache rebalancing start.
+      * @return Rebalancing runnable.
       */
-     public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+     public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
          boolean forcePreload,
 +        Collection<String> caches,
          int cnt,
+         Runnable next,
          @Nullable GridFutureAdapter<Boolean> forcedRebFut);
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 84e7727,d7ec288..d3eeed9
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@@ -160,8 -166,11 +166,12 @@@ public class GridCachePreloaderAdapter 
      }
  
      /** {@inheritDoc} */
-     @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
-         Collection<String> caches, int cnt, @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
+     @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+         boolean forcePreload,
++        Collection<String> caches,
+         int cnt,
+         Runnable next,
+         @Nullable GridFutureAdapter<Boolean> forcedRebFut) {
          return null;
      }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 179f5f8,0be2072..881d257
mode 100644,100755..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -1913,20 -1867,9 +1915,21 @@@ public class GridCacheProcessor extend
          GridCacheVersionManager verMgr = new GridCacheVersionManager();
          GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager();
          GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager();
 +
 +        IgniteCacheDatabaseSharedManager dbMgr = ctx.plugins().createComponent(IgniteCacheDatabaseSharedManager.class);
 +
 +        if (dbMgr == null)
 +            dbMgr = new IgniteCacheDatabaseSharedManager();
 +
 +        IgnitePageStoreManager pageStoreMgr = ctx.plugins().createComponent(IgnitePageStoreManager.class);
 +        IgniteWriteAheadLogManager walMgr = ctx.plugins().createComponent(IgniteWriteAheadLogManager.class);
 +
 +        if (walMgr == null)
 +            walMgr = new IgniteWriteAheadLogNoopManager();
 +
          GridCacheIoManager ioMgr = new GridCacheIoManager();
          CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager();
+         GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager();
  
          CacheJtaManagerAdapter jta = JTA.createOptional();
  
@@@ -2477,44 -2437,8 +2512,45 @@@
      }
  
      /**
 +     * Resets cache state after the cache has been moved to recovery state.
 +     *
 +     * @param cacheName Cache name.
 +     * @return Future that will be completed when state is changed.
 +     */
 +    public IgniteInternalFuture<?> resetCacheState(String cacheName) {
 +        IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(cacheName));
 +
 +        if (proxy == null || proxy.proxyClosed())
 +            return new GridFinishedFuture<>(); // No-op.
 +
 +        checkEmptyTransactions();
 +
 +        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId());
 +
 +        t.markResetLostPartitions();
 +
 +        return F.first(initiateCacheChanges(F.asList(t), false));
 +    }
 +
 +    /**
 +     * Changes global cluster state.
 +     *
 +     * @param state Cache state.
 +     * @return Future that will be completed when state is changed.
 +     */
 +    public IgniteInternalFuture<?> changeGlobalState(CacheState state) {
 +        checkEmptyTransactions();
 +
 +        DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), null, ctx.localNodeId());
 +
 +        t.state(state);
 +
 +        return F.first(initiateCacheChanges(F.asList(t), false));
 +    }
 +
 +    /**
       * @param reqs Requests.
+      * @param failIfExists Fail if exists flag.
       * @return Collection of futures.
       */
      @SuppressWarnings("TypeMayBeWeakened")

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index f01b6fd,117a5c3..384750f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@@ -173,7 -157,7 +178,7 @@@ public class GridCacheSharedContext<K, 
      ) {
          this.kernalCtx = kernalCtx;
  
-         setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, pageStoreMgr, walMgr, dbMgr, depMgr, exchMgr, affMgr, ioMgr);
 -        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, affMgr, ioMgr, ttlMgr);
++        setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, pageStoreMgr, walMgr, dbMgr, depMgr, exchMgr, affMgr, ioMgr, ttlMgr);
  
          this.storeSesLsnrs = storeSesLsnrs;
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 7270f22,0f855fe..7ccf890
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@@ -98,30 -50,14 +98,36 @@@ public class GridCacheTtlManager extend
          if (cleanupDisabled)
              return;
  
 +        cleanupWorker = new CleanupWorker();
 +
 +        pendingEntries = cctx.config().getNearConfiguration() != null ? new GridConcurrentSkipListSetEx() : null;
++
+         cctx.shared().ttl().register(this);
      }
  
 +    /**
 +     * @return {@code True} if eager expired entries cleanup is enabled for cache.
 +     */
 +    public boolean eagerTtlEnabled() {
 +        assert cctx != null : "Manager not started";
 +
 +        return cleanupWorker != null;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override protected void onKernalStart0() throws IgniteCheckedException {
 +        if (cleanupWorker != null)
 +            new IgniteThread(cleanupWorker).start();
 +    }
 +
      /** {@inheritDoc} */
      @Override protected void onKernalStop0(boolean cancel) {
 +        U.cancel(cleanupWorker);
 +        U.join(cleanupWorker, log);
++
+         pendingEntries.clear();
+ 
+         cctx.shared().ttl().unregister(this);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 6460067,90898f9..a5a72a0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@@ -574,9 -558,9 +586,9 @@@ public class IgniteCacheProxy<K, V> ext
              if (grp != null)
                  qry.projection(grp);
  
-             fut = ctx.kernalContext().query().executeQuery(ctx,
+             fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx,
                  new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
 -                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException {
 +                    @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() {
                          return qry.execute();
                      }
                  }, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ac002fc,1d60c42..17df059
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@@ -85,7 -88,9 +86,8 @@@ import org.apache.ignite.internal.util.
  import org.apache.ignite.internal.util.lang.GridCloseableIterator;
  import org.apache.ignite.internal.util.lang.GridMapEntry;
  import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 -import org.apache.ignite.internal.util.typedef.C1;
  import org.apache.ignite.internal.util.typedef.F;
+ import org.apache.ignite.internal.util.typedef.T1;
  import org.apache.ignite.internal.util.typedef.T2;
  import org.apache.ignite.internal.util.typedef.X;
  import org.apache.ignite.internal.util.typedef.internal.CU;

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 690bfeb,4adfa8b..a8d3f2d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@@ -477,147 -484,77 +497,151 @@@ public class GridDistributedTxRemoteAda
  
                              boolean replicate = cacheCtx.isDrEnabled();
  
 -                            while (true) {
 -                                try {
 -                                    GridCacheEntryEx cached = txEntry.cached();
 +                            try {
 +                                while (true) {
 +                                    try {
 +                                        GridCacheEntryEx cached = txEntry.cached();
  
 -                                    if (cached == null)
 -                                        txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
 +                                        if (cached == null)
 +                                            txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
  
 -                                    if (near() && cacheCtx.dr().receiveEnabled()) {
 -                                        cached.markObsolete(xidVer);
 +                                        if (near() && cacheCtx.dr().receiveEnabled()) {
 +                                            cached.markObsolete(xidVer);
  
 -                                        break;
 -                                    }
 +                                            break;
 +                                        }
  
 -                                    GridNearCacheEntry nearCached = null;
 +                                        GridNearCacheEntry nearCached = null;
  
 -                                    if (updateNearCache(cacheCtx, txEntry.key(), topVer))
 -                                        nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
 +                                        if (updateNearCache(cacheCtx, txEntry.key(), topVer))
 +                                            nearCached = cacheCtx.dht().near().peekExx(txEntry.key());
  
 -                                    if (!F.isEmpty(txEntry.entryProcessors()))
 -                                        txEntry.cached().unswap(false);
 +                                        if (!F.isEmpty(txEntry.entryProcessors()))
 +                                            txEntry.cached().unswap(false);
  
 -                                    IgniteBiTuple<GridCacheOperation, CacheObject> res =
 -                                        applyTransformClosures(txEntry, false, ret);
 +                                        IgniteBiTuple<GridCacheOperation, CacheObject> res =
-                                             applyTransformClosures(txEntry, false);
++                                            applyTransformClosures(txEntry, false, ret);
  
 -                                    GridCacheOperation op = res.get1();
 -                                    CacheObject val = res.get2();
 +                                        GridCacheOperation op = res.get1();
 +                                        CacheObject val = res.get2();
  
 -                                    GridCacheVersion explicitVer = txEntry.conflictVersion();
 +                                        GridCacheVersion explicitVer = txEntry.conflictVersion();
  
 -                                    if (explicitVer == null)
 -                                        explicitVer = writeVersion();
 +                                        if (explicitVer == null)
 +                                            explicitVer = writeVersion();
  
 -                                    if (txEntry.ttl() == CU.TTL_ZERO)
 -                                        op = DELETE;
 +                                        if (txEntry.ttl() == CU.TTL_ZERO)
 +                                            op = DELETE;
  
 -                                    boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
 +                                        boolean conflictNeedResolve = cacheCtx.conflictNeedResolve();
  
 -                                    GridCacheVersionConflictContext conflictCtx = null;
 +                                        GridCacheVersionConflictContext conflictCtx = null;
  
 -                                    if (conflictNeedResolve) {
 -                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
 -                                            drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
 +                                        if (conflictNeedResolve) {
 +                                            IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext>
 +                                                drRes = conflictResolve(op, txEntry, val, explicitVer, cached);
  
 -                                        assert drRes != null;
 +                                            assert drRes != null;
  
 -                                        conflictCtx = drRes.get2();
 +                                            conflictCtx = drRes.get2();
  
 -                                        if (conflictCtx.isUseOld())
 -                                            op = NOOP;
 -                                        else if (conflictCtx.isUseNew()) {
 -                                            txEntry.ttl(conflictCtx.ttl());
 -                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
 -                                        }
 -                                        else if (conflictCtx.isMerge()) {
 -                                            op = drRes.get1();
 -                                            val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
 -                                            explicitVer = writeVersion();
 +                                            if (conflictCtx.isUseOld())
 +                                                op = NOOP;
 +                                            else if (conflictCtx.isUseNew()) {
 +                                                txEntry.ttl(conflictCtx.ttl());
 +                                                txEntry.conflictExpireTime(conflictCtx.expireTime());
 +                                            }
 +                                            else if (conflictCtx.isMerge()) {
 +                                                op = drRes.get1();
 +                                                val = txEntry.context().toCacheObject(conflictCtx.mergeValue());
 +                                                explicitVer = writeVersion();
  
 -                                            txEntry.ttl(conflictCtx.ttl());
 -                                            txEntry.conflictExpireTime(conflictCtx.expireTime());
 +                                                txEntry.ttl(conflictCtx.ttl());
 +                                                txEntry.conflictExpireTime(conflictCtx.expireTime());
 +                                            }
 +                                        }
 +                                        else
 +                                            // Nullify explicit version so that innerSet/innerRemove will work as usual.
 +                                            explicitVer = null;
 +
 +                                        GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
 +
 +                                        if (!near() && cctx.wal() != null && op != NOOP && op != RELOAD && op != READ) {
 +                                            if (dataEntries == null)
 +                                                dataEntries = new ArrayList<>(entries.size());
 +
 +                                            dataEntries.add(
 +                                                new DataEntry(
 +                                                    cacheCtx.cacheId(),
 +                                                    txEntry.key(),
 +                                                    val,
 +                                                    op,
 +                                                    nearXidVersion(),
 +                                                    writeVersion(),
 +                                                    0,
 +                                                    txEntry.key().partition(),
 +                                                    txEntry.updateCounter()
 +                                                )
 +                                            );
                                          }
 -                                    }
 -                                    else
 -                                        // Nullify explicit version so that innerSet/innerRemove will work as usual.
 -                                        explicitVer = null;
 -
 -                                    GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
  
 -                                    if (op == CREATE || op == UPDATE) {
 -                                        // Invalidate only for near nodes (backups cannot be invalidated).
 -                                        if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
 +                                        if (op == CREATE || op == UPDATE) {
 +                                            // Invalidate only for near nodes (backups cannot be invalidated).
 +                                            if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear()))
 +                                                cached.innerRemove(this,
 +                                                    eventNodeId(),
 +                                                    nodeId,
 +                                                    false,
 +                                                    true,
 +                                                    true,
 +                                                    txEntry.keepBinary(),
++                                                    txEntry.hasOldValue(),
++                                                    txEntry.oldValue(),
 +                                                    topVer,
 +                                                    null,
 +                                                    replicate ? DR_BACKUP : DR_NONE,
 +                                                    near() ? null : explicitVer,
 +                                                    CU.subjectId(this, cctx),
 +                                                    resolveTaskName(),
 +                                                    dhtVer,
 +                                                    txEntry.updateCounter());
 +                                            else {
 +                                                cached.innerSet(this,
 +                                                    eventNodeId(),
 +                                                    nodeId,
 +                                                    val,
 +                                                    false,
 +                                                    false,
 +                                                    txEntry.ttl(),
 +                                                    true,
 +                                                    true,
 +                                                    txEntry.keepBinary(),
++                                                    txEntry.hasOldValue(),
++                                                    txEntry.oldValue(),
 +                                                    topVer,
 +                                                    null,
 +                                                    replicate ? DR_BACKUP : DR_NONE,
 +                                                    txEntry.conflictExpireTime(),
 +                                                    near() ? null : explicitVer,
 +                                                    CU.subjectId(this, cctx),
 +                                                    resolveTaskName(),
 +                                                    dhtVer,
 +                                                    txEntry.updateCounter());
 +
 +                                                // Keep near entry up to date.
 +                                                if (nearCached != null) {
 +                                                    CacheObject val0 = cached.valueBytes();
 +
 +                                                    nearCached.updateOrEvict(xidVer,
 +                                                        val0,
 +                                                        cached.expireTime(),
 +                                                        cached.ttl(),
 +                                                        nodeId,
 +                                                        topVer);
 +                                                }
 +                                            }
 +                                        }
 +                                        else if (op == DELETE) {
                                              cached.innerRemove(this,
                                                  eventNodeId(),
                                                  nodeId,
@@@ -652,82 -607,111 +678,86 @@@
                                                      topVer);
                                              }
                                          }
 -                                    }
 -                                    else if (op == DELETE) {
 -                                        cached.innerRemove(this,
 -                                            eventNodeId(),
 -                                            nodeId,
 -                                            false,
 -                                            true,
 -                                            true,
 -                                            txEntry.keepBinary(),
 -                                            txEntry.hasOldValue(),
 -                                            txEntry.oldValue(),
 -                                            topVer,
 -                                            null,
 -                                            replicate ? DR_BACKUP : DR_NONE,
 -                                            near() ? null : explicitVer,
 -                                            CU.subjectId(this, cctx),
 -                                            resolveTaskName(),
 -                                            dhtVer,
 -                                            txEntry.updateCounter());
 -
 -                                        // Keep near entry up to date.
 -                                        if (nearCached != null)
 -                                            nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer);
 -                                    }
 -                                    else if (op == RELOAD) {
 -                                        CacheObject reloaded = cached.innerReload();
 +                                        else if (op == READ) {
 +                                            assert near();
  
 -                                        if (nearCached != null) {
 -                                            nearCached.innerReload();
 -
 -                                            nearCached.updateOrEvict(cached.version(),
 -                                                reloaded,
 -                                                cached.expireTime(),
 -                                                cached.ttl(),
 -                                                nodeId,
 -                                                topVer);
 +                                            if (log.isDebugEnabled())
 +                                                log.debug("Ignoring READ entry when committing: " + txEntry);
                                          }
 -                                    }
 -                                    else if (op == READ) {
 -                                        assert near();
 -
 -                                        if (log.isDebugEnabled())
 -                                            log.debug("Ignoring READ entry when committing: " + txEntry);
 -                                    }
 -                                    // No-op.
 -                                    else {
 -                                        if (conflictCtx == null || !conflictCtx.isUseOld()) {
 -                                            if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
 -                                                cached.updateTtl(null, txEntry.ttl());
 -
 -                                            if (nearCached != null) {
 -                                                CacheObject val0 = cached.valueBytes();
 -
 -                                                nearCached.updateOrEvict(xidVer,
 -                                                    val0,
 -                                                    cached.expireTime(),
 -                                                    cached.ttl(),
 -                                                    nodeId,
 -                                                    topVer);
 +                                        // No-op.
 +                                        else {
 +                                            if (conflictCtx == null || !conflictCtx.isUseOld()) {
 +                                                if (txEntry.ttl() != CU.TTL_NOT_CHANGED)
 +                                                    cached.updateTtl(null, txEntry.ttl());
 +
 +                                                if (nearCached != null) {
 +                                                    CacheObject val0 = cached.valueBytes();
 +
 +                                                    nearCached.updateOrEvict(xidVer,
 +                                                        val0,
 +                                                        cached.expireTime(),
 +                                                        cached.ttl(),
 +                                                        nodeId,
 +                                                        topVer);
 +                                                }
                                              }
                                          }
 -                                    }
  
 -                                    // Assert after setting values as we want to make sure
 -                                    // that if we replaced removed entries.
 -                                    assert
 -                                        txEntry.op() == READ || onePhaseCommit() ||
 -                                            // If candidate is not there, then lock was explicit
 -                                            // and we simply allow the commit to proceed.
 -                                            !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
 -                                        "Transaction does not own lock for commit [entry=" + cached +
 -                                            ", tx=" + this + ']';
 -
 -                                    // Break out of while loop.
 -                                    break;
 -                                }
 -                                catch (GridCacheEntryRemovedException ignored) {
 -                                    if (log.isDebugEnabled())
 -                                        log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
 +                                        // Assert after setting values as we want to make sure
 +                                        // that if we replaced removed entries.
 +                                        assert
 +                                            txEntry.op() == READ || onePhaseCommit() ||
 +                                                // If candidate is not there, then lock was explicit
 +                                                // and we simply allow the commit to proceed.
 +                                                !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) :
 +                                            "Transaction does not own lock for commit [entry=" + cached +
 +                                                ", tx=" + this + ']';
 +
 +                                        // Break out of while loop.
 +                                        break;
 +                                    }
 +                                    catch (GridCacheEntryRemovedException ignored) {
 +                                        if (log.isDebugEnabled())
 +                                            log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
  
 -                                    // Renew cached entry.
 -                                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
 +                                        // Renew cached entry.
 +                                        txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion()));
 +                                    }
                                  }
                              }
 -                        }
 -                    }
 -                    catch (Throwable ex) {
 -                        // In case of error, we still make the best effort to commit,
 -                        // as there is no way to rollback at this point.
 -                        err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
 -                            "(all transaction entries will be invalidated): " + CU.txString(this), ex);
 +                            catch (Throwable ex) {
 +                                // In case of error, we still make the best effort to commit,
 +                                // as there is no way to rollback at this point.
 +                                err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
 +                                    "(all transaction entries will be invalidated): " + CU.txString(this), ex);
 +
 +                                U.error(log, "Commit failed.", err);
  
 -                        U.error(log, "Commit failed.", err);
 +                                uncommit();
  
 -                        uncommit();
 +                                state(UNKNOWN);
  
 -                        state(UNKNOWN);
 +                                if (ex instanceof Error)
 +                                    throw (Error)ex;
 +                            }
++                            finally {
++                                if (wrapper != null)
++                                    wrapper.initialize(ret);
++                            }
 +                        }
  
 -                        if (ex instanceof Error)
 -                            throw (Error)ex;
 +                        if (!near() && cctx.wal() != null)
 +                            cctx.wal().log(new DataRecord(dataEntries));
  
 +                        if (ptr != null)
 +                            cctx.wal().fsync(ptr);
 +                    }
 +                    catch (StorageException e) {
 +                        throw new IgniteCheckedException("Failed to log transaction record " +
 +                            "(transaction will be rolled back): " + this, e);
                      }
                      finally {
 -                        if (wrapper != null)
 -                            wrapper.initialize(ret);
 +                        cctx.database().checkpointReadUnlock();
                      }
                  }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index a6f41ac,816132d..88cd16b
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@@ -97,8 -98,11 +100,11 @@@ public class GridClientPartitionTopolog
      private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  
      /** Partition update counters. */
 -    private Map<Integer, Long> cntrMap = new HashMap<>();
 +    private Map<Integer, T2<Long, Long>> cntrMap = new HashMap<>();
  
+     /** */
+     private final Object similarAffKey;
+ 
      /**
       * @param cctx Context.
       * @param cacheId Cache ID.
@@@ -127,9 -134,16 +136,16 @@@
      }
  
      /**
+      * @return Key to find caches with similar affinity.
+      */
+     @Nullable public Object similarAffinityKey() {
+         return similarAffKey;
+     }
+ 
+     /**
       * @return Full map string representation.
       */
 -    @SuppressWarnings( {"ConstantConditions"})
 +    @SuppressWarnings({"ConstantConditions"})
      private String fullMapString() {
          return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString();
      }
@@@ -533,9 -550,9 +550,9 @@@
  
      /** {@inheritDoc} */
      @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-     @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionFullMap partMap,
 -        Map<Integer, Long> cntrMap) {
 +        Map<Integer, T2<Long, Long>> cntrMap) {
          if (log.isDebugEnabled())
              log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
  
@@@ -626,10 -643,10 +643,10 @@@
      }
  
      /** {@inheritDoc} */
-     @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
-     @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+     @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
          GridDhtPartitionMap2 parts,
-         Map<Integer, T2<Long, Long>> cntrMap) {
 -        Map<Integer, Long> cntrMap,
++        Map<Integer, T2<Long, Long>> cntrMap,
+         boolean checkEvictions) {
          if (log.isDebugEnabled())
              log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
  
@@@ -674,44 -692,46 +691,46 @@@
  
              long updateSeq = this.updateSeq.incrementAndGet();
  
-             node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
- 
-             boolean changed = false;
+             node2part.updateSequence(updateSeq);
  
-             if (cur == null || !cur.equals(parts))
-                 changed = true;
+             boolean changed = cur == null || !cur.equals(parts);
  
-             node2part.put(parts.nodeId(), parts);
+             if (changed) {
+                 node2part.put(parts.nodeId(), parts);
  
-             part2node = new HashMap<>(part2node);
+                 // Add new mappings.
+                 for (Integer p : parts.keySet()) {
+                     Set<UUID> ids = part2node.get(p);
  
-             // Add new mappings.
-             for (Integer p : parts.keySet()) {
-                 Set<UUID> ids = part2node.get(p);
+                     if (ids == null)
+                         // Initialize HashSet to size 3 in anticipation that there won't be
+                         // more than 3 nodes per partition.
+                         part2node.put(p, ids = U.newHashSet(3));
  
-                 if (ids == null)
-                     // Initialize HashSet to size 3 in anticipation that there won't be
-                     // more than 3 nodes per partition.
-                     part2node.put(p, ids = U.newHashSet(3));
+                     ids.add(parts.nodeId());
+                 }
  
-                 changed |= ids.add(parts.nodeId());
-             }
+                 // Remove obsolete mappings.
+                 if (cur != null) {
+                     for (Integer p : cur.keySet()) {
+                         if (parts.containsKey(p))
+                             continue;
  
-             // Remove obsolete mappings.
-             if (cur != null) {
-                 for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
-                     Set<UUID> ids = part2node.get(p);
+                         Set<UUID> ids = part2node.get(p);
  
-                     if (ids != null)
-                         changed |= ids.remove(parts.nodeId());
+                         if (ids != null)
+                             ids.remove(parts.nodeId());
+                     }
                  }
              }
+             else
+                 cur.updateSequence(parts.updateSequence(), parts.topologyVersion());
  
              if (cntrMap != null) {
 -                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
 -                    Long cntr = this.cntrMap.get(e.getKey());
 +                for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
 +                    T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
  
 -                    if (cntr == null || cntr < e.getValue())
 +                    if (cntr == null || cntr.get2() < e.getValue().get2())
                          this.cntrMap.put(e.getKey(), e.getValue());
                  }
              }
@@@ -729,24 -749,10 +748,29 @@@
      }
  
      /** {@inheritDoc} */
 +    @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) {
 +        assert false : "detectLostPartitions should never be called on client topology";
 +
 +        return false;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void resetLostPartitions() {
 +        assert false : "resetLostPartitions should never be called on client topology";
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Collection<Integer> lostPartitions() {
 +        assert false : "lostPartitions should never be called on client topology";
 +
 +        return Collections.emptyList();
 +    }
 +
++    /** {@inheritDoc} */
+     @Override public void checkEvictions() {
+         // No-op.
+     }
+ 
      /**
       * Updates value for single partition.
       *
@@@ -877,48 -882,22 +900,36 @@@
      }
  
      /** {@inheritDoc} */
-     @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) {
 -    @Override public Map<Integer, Long> updateCounters(boolean skipZeros) {
--        lock.readLock().lock();
- 
-         try {
-             return node2part.get(nodeId);
-         }
-         finally {
-             lock.readLock().unlock();
-         }
-     }
- 
-     /** {@inheritDoc} */
 +    @Override public void setOwners(int p, Set<UUID> owners, boolean updateSeq) {
 +        lock.writeLock().lock();
  
          try {
 -            if (skipZeros) {
 -                Map<Integer, Long> res = U.newHashMap(cntrMap.size());
 -
 -                for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
 -                    if (!e.getValue().equals(ZERO))
 -                        res.put(e.getKey(), e.getValue());
 -                }
 +            for (Map.Entry<UUID, GridDhtPartitionMap2> e : node2part.entrySet()) {
 +                if (!e.getValue().containsKey(p))
 +                    continue;
  
 -                return res;
 +                if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey()))
 +                    e.getValue().put(p, MOVING);
 +                else if (owners.contains(e.getKey()))
 +                    e.getValue().put(p, OWNING);
              }
 -            else
 -                return new HashMap<>(cntrMap);
 +
 +            part2node.put(p, owners);
 +
 +            if (updateSeq)
 +                this.updateSeq.incrementAndGet();
 +        }
 +        finally {
 +            lock.writeLock().unlock();
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public Map<Integer, T2<Long, Long>> updateCounters() {
 +        lock.readLock().lock();
 +
 +        try {
 +            return new HashMap<>(cntrMap);
          }
          finally {
              lock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------


Mime
View raw message