ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [09/50] [abbrv] incubator-ignite git commit: Merge branch ignite-sql-tests into ignite-45
Date Mon, 16 Mar 2015 09:22:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index e7ce96f,0c9ea00..d410521
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -259,11 -244,11 +259,11 @@@ class GridDhtPartitionTopologyImpl<K, V
                  }
              }
  
-             if (cctx.preloadEnabled()) {
+             if (cctx.rebalanceEnabled()) {
                  for (int p = 0; p < num; p++) {
                      // If this is the first node in grid.
 -                    if (oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) {
 -                        assert exchId.isJoined();
 +                    if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId())) || exchId.isCacheAdded(cctx.cacheId())) {
 +                        assert exchId.isJoined() || exchId.isCacheAdded(cctx.cacheId());
  
                          try {
                              GridDhtLocalPartition locPart = localPartition(p, topVer, true, false);
@@@ -676,8 -658,8 +676,8 @@@
      }
  
      /** {@inheritDoc} */
 -    @Override public List<ClusterNode> owners(int p, long topVer) {
 +    @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
-         if (!cctx.preloadEnabled())
+         if (!cctx.rebalanceEnabled())
              return ownersAndMoving(p, topVer);
  
          return nodes(p, topVer, OWNING);
@@@ -690,10 -672,10 +690,10 @@@
  
      /** {@inheritDoc} */
      @Override public List<ClusterNode> moving(int p) {
-         if (!cctx.preloadEnabled())
+         if (!cctx.rebalanceEnabled())
 -            return ownersAndMoving(p, -1);
 +            return ownersAndMoving(p, AffinityTopologyVersion.NONE);
  
 -        return nodes(p, -1, MOVING);
 +        return nodes(p, AffinityTopologyVersion.NONE, MOVING);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index b566720,19279d4..5edddd0
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@@ -1213,9 -1212,9 +1213,9 @@@ public final class GridDhtTxPrepareFutu
                      }
                  }
  
 -                long topVer = tx.topologyVersion();
 +                AffinityTopologyVersion topVer = tx.topologyVersion();
  
-                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_OBJECT_LOADED);
+                 boolean rec = cctx.gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
  
                  for (GridCacheEntryInfo info : res.preloadEntries()) {
                      GridCacheContext<K, V> cacheCtx = cctx.cacheContext(info.cacheId());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index 144ed7a,0f2443e..3ba08a3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@@ -207,10 -206,10 +207,10 @@@ public class GridDhtPartitionDemandPool
  
          if (exchFut != null) {
              if (log.isDebugEnabled())
-                 log.debug("Forcing preload event for future: " + exchFut);
+                 log.debug("Forcing rebalance event for future: " + exchFut);
  
 -            exchFut.listen(new CI1<IgniteInternalFuture<Long>>() {
 -                @Override public void apply(IgniteInternalFuture<Long> t) {
 +            exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
 +                @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                      cctx.shared().exchange().forcePreloadExchange(exchFut);
                  }
              });

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 5690aab,347489c..24e421d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@@ -18,9 -18,8 +18,10 @@@
  package org.apache.ignite.internal.processors.cache.transactions;
  
  import org.apache.ignite.*;
 +import org.apache.ignite.cluster.*;
 +import org.apache.ignite.internal.processors.affinity.*;
  import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
  import org.apache.ignite.internal.managers.communication.*;
  import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.distributed.near.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 0000000,dd8df35..dde98c6
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1,0 -1,1403 +1,1403 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.datastreamer;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.processors.cacheobject.*;
+ import org.apache.ignite.internal.processors.dr.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.events.EventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  * Data streamer implementation.
+  */
+ @SuppressWarnings("unchecked")
+ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+     /** Isolated updater. */
+     private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+ 
+     /** Cache updater. */
+     private Updater<K, V> updater = ISOLATED_UPDATER;
+ 
+     /** */
+     private byte[] updaterBytes;
+ 
+     /** Max remap count before issuing an error. */
+     private static final int DFLT_MAX_REMAP_CNT = 32;
+ 
+     /** Log reference. */
+     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+ 
+     /** Logger. */
+     private static IgniteLogger log;
+ 
+     /** Cache name ({@code null} for default cache). */
+     private final String cacheName;
+ 
+ 
+     /** Per-node buffer size. */
+     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+     private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+ 
+     /** */
+     private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+ 
+     /** */
+     private long autoFlushFreq;
+ 
+     /** Mapping. */
+     @GridToStringInclude
+     private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+ 
+     /** Discovery listener. */
+     private final GridLocalEventListener discoLsnr;
+ 
+     /** Context. */
+     private final GridKernalContext ctx;
+ 
+     /** */
+     private final IgniteCacheObjectProcessor cacheObjProc;
+ 
+     /** */
+     private final CacheObjectContext cacheObjCtx;
+ 
+     /** Communication topic for responses. */
+     private final Object topic;
+ 
+     /** */
+     private byte[] topicBytes;
+ 
+     /** {@code True} if data loader has been cancelled. */
+     private volatile boolean cancelled;
+ 
+     /** Active futures of this data loader. */
+     @GridToStringInclude
+     private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+ 
+     /** Closure to remove from active futures. */
+     @GridToStringExclude
+     private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+         @Override public void apply(IgniteInternalFuture<?> t) {
+             boolean rmv = activeFuts.remove(t);
+ 
+             assert rmv;
+         }
+     };
+ 
+     /** Job peer deploy aware. */
+     private volatile GridPeerDeployAware jobPda;
+ 
+     /** Deployment class. */
+     private Class<?> depCls;
+ 
+     /** Future to track loading finish. */
+     private final GridFutureAdapter<?> fut;
+ 
+     /** Public API future to track loading finish. */
+     private final IgniteFuture<?> publicFut;
+ 
+     /** Busy lock. */
+     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ 
+     /** Closed flag. */
+     private final AtomicBoolean closed = new AtomicBoolean();
+ 
+     /** */
+     private volatile long lastFlushTime = U.currentTimeMillis();
+ 
+     /** */
+     private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
+ 
+     /** */
+     private boolean skipStore;
+ 
+     /** */
+     private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+ 
+     /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */
+     private static boolean isWarningPrinted;
+ 
+     /**
+      * @param ctx Grid kernal context.
+      * @param cacheName Cache name.
+      * @param flushQ Flush queue.
+      */
+     public DataStreamerImpl(
+         final GridKernalContext ctx,
+         @Nullable final String cacheName,
+         DelayQueue<DataStreamerImpl<K, V>> flushQ
+     ) {
+         assert ctx != null;
+ 
+         this.ctx = ctx;
+         this.cacheObjProc = ctx.cacheObjects();
+ 
+         if (log == null)
+             log = U.logger(ctx, logRef, DataStreamerImpl.class);
+ 
+         ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+ 
+         if (node == null)
+             throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+ 
 -        this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, cacheName);
++        this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, cacheName, null);
+         this.cacheName = cacheName;
+         this.flushQ = flushQ;
+ 
+         discoLsnr = new GridLocalEventListener() {
+             @Override public void onEvent(Event evt) {
+                 assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+ 
+                 DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+ 
+                 UUID id = discoEvt.eventNode().id();
+ 
+                 // Remap regular mappings.
+                 final Buffer buf = bufMappings.remove(id);
+ 
+                 if (buf != null) {
+                     // Only async notification is possible since
+                     // discovery thread may be trapped otherwise.
+                     ctx.closure().callLocalSafe(
+                         new Callable<Object>() {
+                             @Override public Object call() throws Exception {
+                                 buf.onNodeLeft();
+ 
+                                 return null;
+                             }
+                         },
+                         true /* system pool */
+                     );
+                 }
+             }
+         };
+ 
+         ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+ 
+         // Generate unique topic for this loader.
+         topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+ 
+         ctx.io().addMessageListener(topic, new GridMessageListener() {
+             @Override public void onMessage(UUID nodeId, Object msg) {
+                 assert msg instanceof DataStreamerResponse;
+ 
+                 DataStreamerResponse res = (DataStreamerResponse)msg;
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Received data load response: " + res);
+ 
+                 Buffer buf = bufMappings.get(nodeId);
+ 
+                 if (buf != null)
+                     buf.onResponse(res);
+ 
+                 else if (log.isDebugEnabled())
+                     log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+             }
+         });
+ 
+         if (log.isDebugEnabled())
+             log.debug("Added response listener within topic: " + topic);
+ 
+         fut = new DataStreamerFuture(this);
+ 
+         publicFut = new IgniteFutureImpl<>(fut);
+     }
+ 
+     /**
+      * @return Cache object context.
+      */
+     public CacheObjectContext cacheObjectContext() {
+         return cacheObjCtx;
+     }
+ 
+     /**
+      * Enters busy lock.
+      */
+     private void enterBusy() {
+         if (!busyLock.enterBusy())
+             throw new IllegalStateException("Data streamer has been closed.");
+     }
+ 
+     /**
+      * Leaves busy lock.
+      */
+     private void leaveBusy() {
+         busyLock.leaveBusy();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> future() {
+         return publicFut;
+     }
+ 
+     /**
+      * @return Internal future.
+      */
+     public IgniteInternalFuture<?> internalFuture() {
+         return fut;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void deployClass(Class<?> depCls) {
+         this.depCls = depCls;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void updater(Updater<K, V> updater) {
+         A.notNull(updater, "updater");
+ 
+         this.updater = updater;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean allowOverwrite() {
+         return updater != ISOLATED_UPDATER;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void allowOverwrite(boolean allow) {
+         if (allow == allowOverwrite())
+             return;
+ 
+         ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+ 
+         if (node == null)
+             throw new IgniteException("Failed to get node for cache: " + cacheName);
+ 
+         updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean skipStore() {
+         return skipStore;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void skipStore(boolean skipStore) {
+         this.skipStore = skipStore;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override @Nullable public String cacheName() {
+         return cacheName;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int perNodeBufferSize() {
+         return bufSize;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void perNodeBufferSize(int bufSize) {
+         A.ensure(bufSize > 0, "bufSize > 0");
+ 
+         this.bufSize = bufSize;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int perNodeParallelOperations() {
+         return parallelOps;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void perNodeParallelOperations(int parallelOps) {
+         this.parallelOps = parallelOps;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long autoFlushFrequency() {
+         return autoFlushFreq;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void autoFlushFrequency(long autoFlushFreq) {
+         A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+ 
+         long old = this.autoFlushFreq;
+ 
+         if (autoFlushFreq != old) {
+             this.autoFlushFreq = autoFlushFreq;
+ 
+             if (autoFlushFreq != 0 && old == 0)
+                 flushQ.add(this);
+             else if (autoFlushFreq == 0)
+                 flushQ.remove(this);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+         A.notNull(entries, "entries");
+ 
+         return addData(entries.entrySet());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+         A.notEmpty(entries, "entries");
+ 
+         enterBusy();
+ 
+         try {
+             GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+ 
+             resFut.listen(rmvActiveFut);
+ 
+             activeFuts.add(resFut);
+ 
+             Collection<KeyCacheObject> keys = null;
+ 
+             if (entries.size() > 1) {
+                 keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+ 
+                 for (Map.Entry<K, V> entry : entries)
+                     keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true));
+             }
+ 
+             Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
+                 @Override public DataStreamerEntry apply(Entry<K, V> e) {
+                     KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true);
+                     CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true);
+ 
+                     return new DataStreamerEntry(key, val);
+                 }
+             });
+ 
+             load0(entries0, resFut, keys, 0);
+ 
+             return new IgniteFutureImpl<>(resFut);
+         }
+         catch (IgniteException e) {
+             return new IgniteFinishedFutureImpl<>(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param key Key.
+      * @param val Value.
+      * @return Future.
+      */
+     public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) {
+         return addDataInternal(Collections.singleton(new DataStreamerEntry(key, val)));
+     }
+ 
+     /**
+      * @param key Key.
+      * @return Future.
+      */
+     public IgniteFuture<?> removeDataInternal(KeyCacheObject key) {
+         return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null)));
+     }
+ 
+     /**
+      * @param entries Entries.
+      * @return Future.
+      */
+     public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries) {
+         enterBusy();
+ 
+         GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+ 
+         try {
+             resFut.listen(rmvActiveFut);
+ 
+             activeFuts.add(resFut);
+ 
+             Collection<KeyCacheObject> keys = null;
+ 
+             if (entries.size() > 1) {
+                 keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+ 
+                 for (DataStreamerEntry entry : entries)
+                     keys.add(entry.getKey());
+             }
+ 
+             load0(entries, resFut, keys, 0);
+ 
+             return new IgniteFutureImpl<>(resFut);
+         }
+         catch (Throwable e) {
+             resFut.onDone(e);
+ 
+             if (e instanceof Error)
+                 throw e;
+ 
+             return new IgniteFinishedFutureImpl<>(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+         A.notNull(entry, "entry");
+ 
+         return addData(F.asList(entry));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> addData(K key, V val) {
+         A.notNull(key, "key");
+ 
+         KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true);
+         CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
+ 
+         return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0)));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeData(K key) {
+         return addData(key, null);
+     }
+ 
+     /**
+      * @param entries Entries.
+      * @param resFut Result future.
+      * @param activeKeys Active keys.
+      * @param remaps Remaps count.
+      */
+     private void load0(
+         Collection<? extends DataStreamerEntry> entries,
+         final GridFutureAdapter<Object> resFut,
+         @Nullable final Collection<KeyCacheObject> activeKeys,
+         final int remaps
+     ) {
+         assert entries != null;
+ 
+         if (!isWarningPrinted) {
+             synchronized (this) {
+                 if (!allowOverwrite() && !isWarningPrinted) {
+                     U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
+                         "(to change, set allowOverwrite to true)");
+                 }
+ 
+                 isWarningPrinted = true;
+             }
+         }
+ 
+         Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
+ 
+         boolean initPda = ctx.deploy().enabled() && jobPda == null;
+ 
+         for (DataStreamerEntry entry : entries) {
+             List<ClusterNode> nodes;
+ 
+             try {
+                 KeyCacheObject key = entry.getKey();
+ 
+                 assert key != null;
+ 
+                 if (initPda) {
+                     jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
+                         entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
+                         updater);
+ 
+                     initPda = false;
+                 }
+ 
+                 nodes = nodes(key);
+             }
+             catch (IgniteCheckedException e) {
+                 resFut.onDone(e);
+ 
+                 return;
+             }
+ 
+             if (F.isEmpty(nodes)) {
+                 resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                     "(no nodes with cache found in topology) [infos=" + entries.size() +
+                     ", cacheName=" + cacheName + ']'));
+ 
+                 return;
+             }
+ 
+             for (ClusterNode node : nodes) {
+                 Collection<DataStreamerEntry> col = mappings.get(node);
+ 
+                 if (col == null)
+                     mappings.put(node, col = new ArrayList<>());
+ 
+                 col.add(entry);
+             }
+         }
+ 
+         for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
+             final UUID nodeId = e.getKey().id();
+ 
+             Buffer buf = bufMappings.get(nodeId);
+ 
+             if (buf == null) {
+                 Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+ 
+                 if (old != null)
+                     buf = old;
+             }
+ 
+             final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+ 
+             IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                 @Override public void apply(IgniteInternalFuture<?> t) {
+                     try {
+                         t.get();
+ 
+                         if (activeKeys != null) {
+                             for (DataStreamerEntry e : entriesForNode)
+                                 activeKeys.remove(e.getKey());
+ 
+                             if (activeKeys.isEmpty())
+                                 resFut.onDone();
+                         }
+                         else {
+                             assert entriesForNode.size() == 1;
+ 
+                             // That has been a single key,
+                             // so complete result future right away.
+                             resFut.onDone();
+                         }
+                     }
+                     catch (IgniteCheckedException e1) {
+                         if (log.isDebugEnabled())
+                             log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+ 
+                         if (cancelled) {
+                             resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                                 DataStreamerImpl.this, e1));
+                         }
+                         else if (remaps + 1 > maxRemapCnt) {
+                             resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+                                 + remaps), e1);
+                         }
+                         else
+                             load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                     }
+                 }
+             };
+ 
+             GridFutureAdapter<?> f;
+ 
+             try {
+                 f = buf.update(entriesForNode, lsnr);
+             }
+             catch (IgniteInterruptedCheckedException e1) {
+                 resFut.onDone(e1);
+ 
+                 return;
+             }
+ 
+             if (ctx.discovery().node(nodeId) == null) {
+                 if (bufMappings.remove(nodeId, buf))
+                     buf.onNodeLeft();
+ 
+                 if (f != null)
+                     f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                         "(node has left): " + nodeId));
+             }
+         }
+     }
+ 
+     /**
+      * @param key Key to map.
+      * @return Nodes to send requests to.
+      * @throws IgniteCheckedException If failed.
+      */
+     private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException {
+         GridAffinityProcessor aff = ctx.affinity();
+ 
+         return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+             Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+     }
+ 
+     /**
+      * Performs flush.
+      *
+      * @throws IgniteCheckedException If failed.
+      */
+     private void doFlush() throws IgniteCheckedException {
+         lastFlushTime = U.currentTimeMillis();
+ 
+         List<IgniteInternalFuture> activeFuts0 = null;
+ 
+         int doneCnt = 0;
+ 
+         for (IgniteInternalFuture<?> f : activeFuts) {
+             if (!f.isDone()) {
+                 if (activeFuts0 == null)
+                     activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+ 
+                 activeFuts0.add(f);
+             }
+             else {
+                 f.get();
+ 
+                 doneCnt++;
+             }
+         }
+ 
+         if (activeFuts0 == null || activeFuts0.isEmpty())
+             return;
+ 
+         while (true) {
+             Queue<IgniteInternalFuture<?>> q = null;
+ 
+             for (Buffer buf : bufMappings.values()) {
+                 IgniteInternalFuture<?> flushFut = buf.flush();
+ 
+                 if (flushFut != null) {
+                     if (q == null)
+                         q = new ArrayDeque<>(bufMappings.size() * 2);
+ 
+                     q.add(flushFut);
+                 }
+             }
+ 
+             if (q != null) {
+                 assert !q.isEmpty();
+ 
+                 boolean err = false;
+ 
+                 for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+                     try {
+                         fut.get();
+                     }
+                     catch (IgniteCheckedException e) {
+                         if (log.isDebugEnabled())
+                             log.debug("Failed to flush buffer: " + e);
+ 
+                         err = true;
+                     }
+                 }
+ 
+                 if (err)
+                     // Remaps needed - flush buffers.
+                     continue;
+             }
+ 
+             doneCnt = 0;
+ 
+             for (int i = 0; i < activeFuts0.size(); i++) {
+                 IgniteInternalFuture f = activeFuts0.get(i);
+ 
+                 if (f == null)
+                     doneCnt++;
+                 else if (f.isDone()) {
+                     f.get();
+ 
+                     doneCnt++;
+ 
+                     activeFuts0.set(i, null);
+                 }
+                 else
+                     break;
+             }
+ 
+             if (doneCnt == activeFuts0.size())
+                 return;
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("ForLoopReplaceableByForEach")
+     @Override public void flush() throws IgniteException {
+         enterBusy();
+ 
+         try {
+             doFlush();
+         }
+         catch (IgniteCheckedException e) {
+             throw U.convertException(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * Flushes every internal buffer if buffer was flushed before passed in
+      * threshold.
+      * <p>
+      * Does not wait for result and does not fail on errors assuming that this method
+      * should be called periodically.
+      */
+     @Override public void tryFlush() throws IgniteInterruptedException {
+         if (!busyLock.enterBusy())
+             return;
+ 
+         try {
+             for (Buffer buf : bufMappings.values())
+                 buf.flush();
+ 
+             lastFlushTime = U.currentTimeMillis();
+         }
+         catch (IgniteInterruptedCheckedException e) {
+             throw U.convertException(e);
+         }
+         finally {
+             leaveBusy();
+         }
+     }
+ 
+     /**
+      * @param cancel {@code True} to close with cancellation.
+      * @throws IgniteException If failed.
+      */
+     @Override public void close(boolean cancel) throws IgniteException {
+         try {
+             closeEx(cancel);
+         }
+         catch (IgniteCheckedException e) {
+             throw U.convertException(e);
+         }
+     }
+ 
+     /**
+      * @param cancel {@code True} to close with cancellation.
+      * @throws IgniteCheckedException If failed.
+      */
+     public void closeEx(boolean cancel) throws IgniteCheckedException {
+         if (!closed.compareAndSet(false, true))
+             return;
+ 
+         busyLock.block();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
+ 
+         IgniteCheckedException e = null;
+ 
+         try {
+             // Assuming that no methods are called on this loader after this method is called.
+             if (cancel) {
+                 cancelled = true;
+ 
+                 for (Buffer buf : bufMappings.values())
+                     buf.cancelAll();
+             }
+             else
+                 doFlush();
+ 
+             ctx.event().removeLocalEventListener(discoLsnr);
+ 
+             ctx.io().removeMessageListener(topic);
+         }
+         catch (IgniteCheckedException e0) {
+             e = e0;
+         }
+ 
+         fut.onDone(null, e);
+ 
+         if (e != null)
+             throw e;
+     }
+ 
+     /**
+      * @return {@code true} If the loader is closed.
+      */
+     boolean isClosed() {
+         return fut.isDone();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void close() throws IgniteException {
+         close(false);
+     }
+ 
+     /**
+      * @return Max remap count.
+      */
+     public int maxRemapCount() {
+         return maxRemapCnt;
+     }
+ 
+     /**
+      * @param maxRemapCnt New max remap count.
+      */
+     public void maxRemapCount(int maxRemapCnt) {
+         this.maxRemapCnt = maxRemapCnt;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(DataStreamerImpl.class, this);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long getDelay(TimeUnit unit) {
+         return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+     }
+ 
+     /**
+      * @return Next flush time.
+      */
+     private long nextFlushTime() {
+         return lastFlushTime + autoFlushFreq;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int compareTo(Delayed o) {
+         return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+     }
+ 
+     /**
+      *
+      */
+     private class Buffer {
+         /** Node. */
+         private final ClusterNode node;
+ 
+         /** Active futures. */
+         private final Collection<IgniteInternalFuture<Object>> locFuts;
+ 
+         /** Buffered entries. */
+         private List<DataStreamerEntry> entries;
+ 
+         /** */
+         @GridToStringExclude
+         private GridFutureAdapter<Object> curFut;
+ 
+         /** Local node flag. */
+         private final boolean isLocNode;
+ 
+         /** ID generator. */
+         private final AtomicLong idGen = new AtomicLong();
+ 
+         /** Active futures. */
+         private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+ 
+         /** */
+         private final Semaphore sem;
+ 
+         /** Closure to signal on task finish. */
+         @GridToStringExclude
+         private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+             @Override public void apply(IgniteInternalFuture<Object> t) {
+                 signalTaskFinished(t);
+             }
+         };
+ 
+         /**
+          * @param node Node.
+          */
+         Buffer(ClusterNode node) {
+             assert node != null;
+ 
+             this.node = node;
+ 
+             locFuts = new GridConcurrentHashSet<>();
+             reqs = new ConcurrentHashMap8<>();
+ 
+             // Cache local node flag.
+             isLocNode = node.equals(ctx.discovery().localNode());
+ 
+             entries = newEntries();
+             curFut = new GridFutureAdapter<>();
+             curFut.listen(signalC);
+ 
+             sem = new Semaphore(parallelOps);
+         }
+ 
+         /**
+          * @param newEntries Infos.
+          * @param lsnr Listener for the operation future.
+          * @throws IgniteInterruptedCheckedException If failed.
+          * @return Future for operation.
+          */
+         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+             IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+             List<DataStreamerEntry> entries0 = null;
+             GridFutureAdapter<Object> curFut0;
+ 
+             synchronized (this) {
+                 curFut0 = curFut;
+ 
+                 curFut0.listen(lsnr);
+ 
+                 for (DataStreamerEntry entry : newEntries)
+                     entries.add(entry);
+ 
+                 if (entries.size() >= bufSize) {
+                     entries0 = entries;
+ 
+                     entries = newEntries();
+                     curFut = new GridFutureAdapter<>();
+                     curFut.listen(signalC);
+                 }
+             }
+ 
+             if (entries0 != null) {
+                 submit(entries0, curFut0);
+ 
+                 if (cancelled)
+                     curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
+             }
+ 
+             return curFut0;
+         }
+ 
+         /**
+          * @return Fresh collection with some space for outgrowth.
+          */
+         private List<DataStreamerEntry> newEntries() {
+             return new ArrayList<>((int)(bufSize * 1.2));
+         }
+ 
+         /**
+          * @return Future if any submitted.
+          *
+          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+          */
+         @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+             List<DataStreamerEntry> entries0 = null;
+             GridFutureAdapter<Object> curFut0 = null;
+ 
+             synchronized (this) {
+                 if (!entries.isEmpty()) {
+                     entries0 = entries;
+                     curFut0 = curFut;
+ 
+                     entries = newEntries();
+                     curFut = new GridFutureAdapter<>();
+                     curFut.listen(signalC);
+                 }
+             }
+ 
+             if (entries0 != null)
+                 submit(entries0, curFut0);
+ 
+             // Create compound future for this flush.
+             GridCompoundFuture<Object, Object> res = null;
+ 
+             for (IgniteInternalFuture<Object> f : locFuts) {
+                 if (res == null)
+                     res = new GridCompoundFuture<>();
+ 
+                 res.add(f);
+             }
+ 
+             for (IgniteInternalFuture<Object> f : reqs.values()) {
+                 if (res == null)
+                     res = new GridCompoundFuture<>();
+ 
+                 res.add(f);
+             }
+ 
+             if (res != null)
+                 res.markInitialized();
+ 
+             return res;
+         }
+ 
+         /**
+          * Increments active tasks count.
+          *
+          * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+          */
+         private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+             U.acquire(sem);
+         }
+ 
+         /**
+          * @param f Future that finished.
+          */
+         private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+             assert f != null;
+ 
+             sem.release();
+         }
+ 
+         /**
+          * @param entries Entries to submit.
+          * @param curFut Current future.
+          * @throws IgniteInterruptedCheckedException If interrupted.
+          */
+         private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut)
+             throws IgniteInterruptedCheckedException {
+             assert entries != null;
+             assert !entries.isEmpty();
+             assert curFut != null;
+ 
+             incrementActiveTasks();
+ 
+             IgniteInternalFuture<Object> fut;
+ 
+             if (isLocNode) {
+                 fut = ctx.closure().callLocalSafe(
+                     new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, updater), false);
+ 
+                 locFuts.add(fut);
+ 
+                 fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                     @Override public void apply(IgniteInternalFuture<Object> t) {
+                         try {
+                             boolean rmv = locFuts.remove(t);
+ 
+                             assert rmv;
+ 
+                             curFut.onDone(t.get());
+                         }
+                         catch (IgniteCheckedException e) {
+                             curFut.onDone(e);
+                         }
+                     }
+                 });
+             }
+             else {
+                 try {
+                     for (DataStreamerEntry e : entries) {
+                         e.getKey().prepareMarshal(cacheObjCtx);
+ 
+                         CacheObject val = e.getValue();
+ 
+                         if (val != null)
+                             val.prepareMarshal(cacheObjCtx);
+                     }
+ 
+                     if (updaterBytes == null) {
+                         assert updater != null;
+ 
+                         updaterBytes = ctx.config().getMarshaller().marshal(updater);
+                     }
+ 
+                     if (topicBytes == null)
+                         topicBytes = ctx.config().getMarshaller().marshal(topic);
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to marshal (request will not be sent).", e);
+ 
+                     return;
+                 }
+ 
+                 GridDeployment dep = null;
+                 GridPeerDeployAware jobPda0 = null;
+ 
+                 if (ctx.deploy().enabled()) {
+                     try {
+                         jobPda0 = jobPda;
+ 
+                         assert jobPda0 != null;
+ 
+                         dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+ 
+                         GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+ 
+                         if (cache != null)
+                             cache.context().deploy().onEnter();
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+ 
+                         return;
+                     }
+ 
+                     if (dep == null)
+                         U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+                 }
+ 
+                 long reqId = idGen.incrementAndGet();
+ 
+                 fut = curFut;
+ 
+                 reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+ 
+                 DataStreamerRequest req = new DataStreamerRequest(
+                     reqId,
+                     topicBytes,
+                     cacheName,
+                     updaterBytes,
+                     entries,
+                     true,
+                     skipStore,
+                     dep != null ? dep.deployMode() : null,
+                     dep != null ? jobPda0.deployClass().getName() : null,
+                     dep != null ? dep.userVersion() : null,
+                     dep != null ? dep.participants() : null,
+                     dep != null ? dep.classLoaderId() : null,
+                     dep == null);
+ 
+                 try {
+                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                         ((GridFutureAdapter<Object>)fut).onDone(e);
+                     else
+                         ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+                             "request (node has left): " + node.id()));
+                 }
+             }
+         }
+ 
+         /**
+          *
+          */
+         void onNodeLeft() {
+             assert !isLocNode;
+             assert bufMappings.get(node.id()) != this;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Forcibly completing futures (node has left): " + node.id());
+ 
+             Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                 "(node has left): " + node.id());
+ 
+             for (GridFutureAdapter<Object> f : reqs.values())
+                 f.onDone(e);
+ 
+             // Make sure to complete current future.
+             GridFutureAdapter<Object> curFut0;
+ 
+             synchronized (this) {
+                 curFut0 = curFut;
+             }
+ 
+             curFut0.onDone(e);
+         }
+ 
+         /**
+          * @param res Response.
+          */
+         void onResponse(DataStreamerResponse res) {
+             if (log.isDebugEnabled())
+                 log.debug("Received data load response: " + res);
+ 
+             GridFutureAdapter<?> f = reqs.remove(res.requestId());
+ 
+             if (f == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Future for request has not been found: " + res.requestId());
+ 
+                 return;
+             }
+ 
+             Throwable err = null;
+ 
+             byte[] errBytes = res.errorBytes();
+ 
+             if (errBytes != null) {
+                 try {
+                     GridPeerDeployAware jobPda0 = jobPda;
+ 
+                     err = ctx.config().getMarshaller().unmarshal(
+                         errBytes,
+                         jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+                 }
+                 catch (IgniteCheckedException e) {
+                     f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+ 
+                     return;
+                 }
+             }
+ 
+             f.onDone(null, err);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+         }
+ 
+         /**
+          *
+          */
+         void cancelAll() {
+             IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
+ 
+             for (IgniteInternalFuture<?> f : locFuts) {
+                 try {
+                     f.cancel();
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to cancel mini-future.", e);
+                 }
+             }
+ 
+             for (GridFutureAdapter<?> f : reqs.values())
+                 f.onDone(err);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             int size;
+ 
+             synchronized (this) {
+                 size = entries.size();
+             }
+ 
+             return S.toString(Buffer.class, this,
+                 "entriesCnt", size,
+                 "locFutsSize", locFuts.size(),
+                 "reqsSize", reqs.size());
+         }
+     }
+ 
+     /**
+      * Data streamer peer-deploy aware.
+      */
+     private class DataStreamerPda implements GridPeerDeployAware {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Deploy class. */
+         private Class<?> cls;
+ 
+         /** Class loader. */
+         private ClassLoader ldr;
+ 
+         /** Collection of objects to detect deploy class and class loader. */
+         private Collection<Object> objs;
+ 
+         /**
+          * Constructs data streamer peer-deploy aware.
+          *
+          * @param objs Collection of objects to detect deploy class and class loader.
+          */
+         private DataStreamerPda(Object... objs) {
+             this.objs = Arrays.asList(objs);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Class<?> deployClass() {
+             if (cls == null) {
+                 Class<?> cls0 = null;
+ 
+                 if (depCls != null)
+                     cls0 = depCls;
+                 else {
+                     for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+                         Object o = it.next();
+ 
+                         if (o != null)
+                             cls0 = U.detectClass(o);
+                     }
+ 
+                     if (cls0 == null || U.isJdk(cls0))
+                         cls0 = DataStreamerImpl.class;
+                 }
+ 
+                 assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+ 
+                 cls = cls0;
+             }
+ 
+             return cls;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public ClassLoader classLoader() {
+             if (ldr == null) {
+                 ClassLoader ldr0 = deployClass().getClassLoader();
+ 
+                 // Safety.
+                 if (ldr0 == null)
+                     ldr0 = U.gridClassLoader();
+ 
+                 assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+ 
+                 ldr = ldr0;
+             }
+ 
+             return ldr;
+         }
+     }
+ 
+     /**
+      * Isolated updater which only loads entry initial value.
+      */
+     private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>,
+         DataStreamerCacheUpdaters.InternalUpdater {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** {@inheritDoc} */
+         @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache,
+             Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+             IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
+ 
+             GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache();
+ 
+             if (internalCache.isNear())
+                 internalCache = internalCache.context().near().dht();
+ 
+             GridCacheContext cctx = internalCache.context();
+ 
 -            long topVer = cctx.affinity().affinityTopologyVersion();
++            AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+ 
+             GridCacheVersion ver = cctx.versions().next(topVer);
+ 
+             for (Map.Entry<KeyCacheObject, CacheObject> e : entries) {
+                 try {
+                     e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+ 
+                     GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+ 
+                     entry.unswap(true, false);
+ 
+                     entry.initialValue(e.getValue(),
+                         ver,
+                         CU.TTL_ETERNAL,
+                         CU.EXPIRE_TIME_ETERNAL,
+                         false,
+                         topVer,
+                         GridDrType.DR_LOAD);
+ 
+                     cctx.evicts().touch(entry, topVer);
+                 }
+                 catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+                     // No-op.
+                 }
+                 catch (IgniteCheckedException ex) {
+                     IgniteLogger log = cache.unwrap(Ignite.class).log();
+ 
+                     U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+                 }
+             }
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index fd59174,db550c3..a0618a3
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@@ -25,6 -26,7 +25,7 @@@ import org.apache.ignite.configuration.
  import org.apache.ignite.igfs.*;
  import org.apache.ignite.igfs.mapreduce.*;
  import org.apache.ignite.internal.*;
 -import org.apache.ignite.internal.processors.cache.*;
++import org.apache.ignite.internal.processors.query.*;
  import org.apache.ignite.internal.util.ipc.*;
  import org.apache.ignite.internal.util.typedef.*;
  import org.apache.ignite.internal.util.typedef.internal.*;
@@@ -261,27 -262,16 +262,27 @@@ public class IgfsProcessor extends Igfs
                  throw new IgniteCheckedException("Duplicate IGFS name found (check configuration and " +
                      "assign unique name to each): " + name);
  
 -            GridCacheAdapter<Object, Object> dataCache = ctx.cache().internalCache(cfg.getDataCacheName());
 +            CacheConfiguration dataCacheCfg = config(cfg.getDataCacheName());
 +            CacheConfiguration metaCacheCfg = config(cfg.getMetaCacheName());
  
 -            if (dataCache == null)
 +            if (dataCacheCfg == null)
                  throw new IgniteCheckedException("Data cache is not configured locally for IGFS: " + cfg);
  
-             if (dataCacheCfg.isQueryIndexEnabled())
 -            GridCache<Object, Object> metaCache = ctx.cache().cache(cfg.getMetaCacheName());
++            if (GridQueryProcessor.isEnabled(dataCacheCfg))
 +                throw new IgniteCheckedException("IGFS data cache cannot start with enabled query indexing.");
  
 -            if (metaCache == null)
 +            if (dataCacheCfg.getAtomicityMode() != TRANSACTIONAL)
 +                throw new IgniteCheckedException("Data cache should be transactional: " + cfg.getDataCacheName());
 +
 +            if (metaCacheCfg == null)
                  throw new IgniteCheckedException("Metadata cache is not configured locally for IGFS: " + cfg);
  
-             if (metaCacheCfg.isQueryIndexEnabled())
++            if (GridQueryProcessor.isEnabled(metaCacheCfg))
 +                throw new IgniteCheckedException("IGFS metadata cache cannot start with enabled query indexing.");
 +
 +            if (metaCacheCfg.getAtomicityMode() != TRANSACTIONAL)
 +                throw new IgniteCheckedException("Meta cache should be transactional: " + cfg.getMetaCacheName());
 +
              if (F.eq(cfg.getDataCacheName(), cfg.getMetaCacheName()))
                  throw new IgniteCheckedException("Cannot use same cache as both data and meta cache: " + cfg.getName());
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 2208341,fb8f4b8..bdd9de2
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@@ -18,8 -18,9 +18,10 @@@
  package org.apache.ignite.internal.processors.query;
  
  import org.apache.ignite.*;
+ import org.apache.ignite.cache.query.*;
+ import org.apache.ignite.configuration.*;
  import org.apache.ignite.internal.*;
 +import org.apache.ignite.internal.processors.cache.*;
  import org.apache.ignite.internal.processors.cache.query.*;
  import org.apache.ignite.internal.util.lang.*;
  import org.apache.ignite.lang.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
index ebeef17,6bda0ad..a138a2a
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerAbstractSelfTest.java
@@@ -112,9 -111,8 +111,8 @@@ public class IgfsFragmentizerAbstractSe
          cfg.setCacheMode(PARTITIONED);
          cfg.setBackups(0);
          cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(IGFS_GROUP_SIZE));
 -        cfg.setDistributionMode(CacheDistributionMode.PARTITIONED_ONLY);
 +        cfg.setNearConfiguration(null);
          cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         cfg.setQueryIndexEnabled(false);
          cfg.setAtomicityMode(TRANSACTIONAL);
  
          return cfg;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
index 5b9b11b,f9ff6b4..1ad7cb9
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManagerAliveCacheSelfTest.java
@@@ -34,8 -33,9 +34,8 @@@ import org.apache.ignite.testframework.
  import java.util.*;
  import java.util.concurrent.*;
  
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  
  /**
@@@ -82,9 -82,8 +82,8 @@@ public class GridDiscoveryManagerAliveC
  
          cCfg.setCacheMode(PARTITIONED);
          cCfg.setBackups(1);
 -        cCfg.setDistributionMode(NEAR_PARTITIONED);
 +        cCfg.setNearConfiguration(new NearCacheConfiguration());
-         cCfg.setPreloadMode(SYNC);
-         cCfg.setQueryIndexEnabled(false);
+         cCfg.setRebalanceMode(SYNC);
          cCfg.setWriteSynchronizationMode(FULL_SYNC);
  
          TcpDiscoverySpi disc = new TcpDiscoverySpi();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 364ac38,0c80ed6..186bd46
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@@ -4150,31 -4094,197 +4149,226 @@@ public abstract class GridCacheAbstract
      }
  
      /**
+      * @throws Exception If failed.
+      */
+     public void testLocalClearKey() throws Exception {
+         addKeys();
+ 
+         String keyToRmv = "key" + 25;
+ 
+         Ignite g = primaryIgnite(keyToRmv);
+ 
+         g.<String, Integer>jcache(null).localClear(keyToRmv);
+ 
+         checkLocalRemovedKey(keyToRmv);
+ 
+         g.<String, Integer>jcache(null).put(keyToRmv, 1);
+ 
+         String keyToEvict = "key" + 30;
+ 
+         g = primaryIgnite(keyToEvict);
+ 
+         g.<String, Integer>jcache(null).localEvict(Collections.singleton(keyToEvict));
+ 
+         g.<String, Integer>jcache(null).localClear(keyToEvict);
+ 
+         checkLocalRemovedKey(keyToEvict);
+     }
+ 
+     /**
+      * @param keyToRmv Removed key.
+      */
+     private void checkLocalRemovedKey(String keyToRmv) {
+         for (int i = 0; i < 500; ++i) {
+             String key = "key" + i;
+ 
+             boolean found = primaryIgnite(key).jcache(null).localPeek(key) != null;
+ 
+             if (keyToRmv.equals(key)) {
+                 Collection<ClusterNode> nodes = grid(0).affinity(null).mapKeyToPrimaryAndBackups(key);
+ 
+                 for (int j = 0; j < gridCount(); ++j) {
+                     if (nodes.contains(grid(j).localNode()) && grid(j) != primaryIgnite(key))
+                         assertTrue("Not found on backup removed key ", grid(j).jcache(null).localPeek(key) != null);
+                 }
+ 
+                 assertFalse("Found removed key " + key, found);
+             }
+             else
+                 assertTrue("Not found key " + key, found);
+         }
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testLocalClearKeys() throws Exception {
+         Map<String, List<String>> keys = addKeys();
+ 
+         Ignite g = grid(0);
+ 
+         Set<String> keysToRmv = new HashSet<>();
+ 
+         for (int i = 0; i < gridCount(); ++i) {
+             List<String> gridKeys = keys.get(grid(i).name());
+ 
+             if (gridKeys.size() > 2) {
+                 keysToRmv.add(gridKeys.get(0));
+ 
+                 keysToRmv.add(gridKeys.get(1));
+ 
+                 g = grid(i);
+ 
+                 break;
+             }
+         }
+ 
+         assert keysToRmv.size() > 1;
+ 
+         g.<String, Integer>jcache(null).localClearAll(keysToRmv);
+ 
+         for (int i = 0; i < 500; ++i) {
+             String key = "key" + i;
+ 
+             boolean found = primaryIgnite(key).jcache(null).localPeek(key) != null;
+ 
+             if (keysToRmv.contains(key))
+                 assertFalse("Found removed key " + key, found);
+             else
+                 assertTrue("Not found key " + key, found);
+         }
+     }
+ 
+     /**
+      * Add 500 keys to cache only on primaries nodes.
+      *
+      * @return Map grid's name to its primary keys.
+      */
+     private Map<String, List<String>> addKeys() {
+         // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
+         // because some of them were blocked due to having readers.
+         Map<String, List<String>> keys = new HashMap<>();
+ 
+         for (int i = 0; i < gridCount(); ++i)
+             keys.put(grid(i).name(), new ArrayList<String>());
+ 
+         for (int i = 0; i < 500; ++i) {
+             String key = "key" + i;
+ 
+             Ignite g = primaryIgnite(key);
+ 
+             g.jcache(null).put(key, "value" + i);
+ 
+             keys.get(g.name()).add(key);
+         }
+ 
+         return keys;
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testGlobalClearKey() throws Exception {
+         testGlobalClearKey(false, Arrays.asList("key25"));
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testGlobalClearKeyAsync() throws Exception {
+         testGlobalClearKey(true, Arrays.asList("key25"));
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testGlobalClearKeys() throws Exception {
+         testGlobalClearKey(false, Arrays.asList("key25", "key100", "key150"));
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testGlobalClearKeysAsync() throws Exception {
+         testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150"));
+     }
+ 
+     /**
+      * @param async If {@code true} uses async method.
+      * @param keysToRmv Keys to remove.
+      * @throws Exception If failed.
+      */
+     protected void testGlobalClearKey(boolean async, Collection<String> keysToRmv) throws Exception {
+         // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
+         // because some of them were blocked due to having readers.
+         for (int i = 0; i < 500; ++i) {
+             String key = "key" + i;
+ 
+             Ignite g = primaryIgnite(key);
+ 
+             g.jcache(null).put(key, "value" + i);
+         }
+ 
+         if (async) {
+             IgniteCache<String, Integer> asyncCache = jcache().withAsync();
+ 
+             if (keysToRmv.size() == 1)
+                 asyncCache.clear(F.first(keysToRmv));
+             else
+                 asyncCache.clearAll(new HashSet<>(keysToRmv));
+ 
+             asyncCache.future().get();
+         }
+         else {
+             if (keysToRmv.size() == 1)
+                 jcache().clear(F.first(keysToRmv));
+             else
+                 jcache().clearAll(new HashSet<>(keysToRmv));
+         }
+ 
+         for (int i = 0; i < 500; ++i) {
+             String key = "key" + i;
+ 
+             boolean found = false;
+ 
+             for (int j = 0; j < gridCount(); j++) {
+                 if (jcache(j).localPeek(key) != null)
+                     found = true;
+             }
+ 
+             if (!keysToRmv.contains(key))
+                 assertTrue("Not found key " + key, found);
+             else
+                 assertFalse("Found removed key " + key, found);
+         }
+     }
++
++    /**
 +     *
 +     */
 +    protected CacheStartMode cacheStartType() {
 +        String mode = System.getProperty("cache.start.mode");
 +
 +        if (CacheStartMode.NODES_THEN_CACHES.name().equalsIgnoreCase(mode))
 +            return CacheStartMode.NODES_THEN_CACHES;
 +
 +        if (CacheStartMode.ONE_BY_ONE.name().equalsIgnoreCase(mode))
 +            return CacheStartMode.ONE_BY_ONE;
 +
 +        return CacheStartMode.STATIC;
 +    }
 +
 +    /**
 +     *
 +     */
 +    public enum CacheStartMode {
 +        /** Start caches together nodes (not dynamically) */
 +        STATIC,
 +
 +        /** */
 +        NODES_THEN_CACHES,
 +
 +        /** */
 +        ONE_BY_ONE
 +    }
  }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
index b4adbf4,3d7ed58..54c9316
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractLocalStoreSelfTest.java
@@@ -119,9 -119,9 +119,9 @@@ public abstract class GridCacheAbstract
          cacheCfg.setName(cacheName);
          cacheCfg.setCacheMode(getCacheMode());
          cacheCfg.setAtomicityMode(getAtomicMode());
 -        cacheCfg.setDistributionMode(getDistributionMode());
 +        cacheCfg.setNearConfiguration(nearConfiguration());
          cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-         cacheCfg.setPreloadMode(SYNC);
+         cacheCfg.setRebalanceMode(SYNC);
  
          if (gridName.endsWith("1"))
              cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory<CacheStore>(LOCAL_STORE_1));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index e4e2fc8,dacbf63..d3d83e2
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@@ -252,7 -251,8 +252,8 @@@ public abstract class GridCacheAbstract
          cfg.setCacheMode(cacheMode());
          cfg.setAtomicityMode(atomicityMode());
          cfg.setWriteSynchronizationMode(writeSynchronization());
 -        cfg.setDistributionMode(distributionMode());
 +        cfg.setNearConfiguration(nearConfiguration());
+         cfg.setIndexedTypes(indexedTypes());
  
          if (cacheMode() == PARTITIONED)
              cfg.setBackups(1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
index f7f05d6,2d551a1..b28b268
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractUsersAffinityMapperSelfTest.java
@@@ -64,9 -66,9 +66,9 @@@ public abstract class GridCacheAbstract
          cacheCfg.setName(null);
          cacheCfg.setCacheMode(getCacheMode());
          cacheCfg.setAtomicityMode(getAtomicMode());
 -        cacheCfg.setDistributionMode(getDistributionMode());
 +        cacheCfg.setNearConfiguration(nearConfiguration());
          cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         cacheCfg.setPreloadMode(CachePreloadMode.SYNC);
+         cacheCfg.setRebalanceMode(SYNC);
          cacheCfg.setAffinityMapper(AFFINITY_MAPPER);
  
          cfg.setCacheConfiguration(cacheCfg);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java
index 9b32b69,69bcef2..fa35de3
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreAbstractTest.java
@@@ -88,7 -88,8 +88,7 @@@ public abstract class GridCacheBasicSto
          cc.setWriteSynchronizationMode(FULL_SYNC);
          cc.setSwapEnabled(false);
          cc.setAtomicityMode(atomicityMode());
-         cc.setPreloadMode(SYNC);
 -        cc.setDistributionMode(distributionMode());
+         cc.setRebalanceMode(SYNC);
  
          cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
          cc.setReadThrough(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
index 1a42930,d3de9ff..f0212bb
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java
@@@ -45,8 -46,9 +46,8 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.*;
  
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  import static org.apache.ignite.transactions.TransactionConcurrency.*;
  import static org.apache.ignite.transactions.TransactionIsolation.*;
@@@ -110,11 -112,13 +111,11 @@@ public class GridCacheConcurrentTxMulti
              CacheConfiguration cc = defaultCacheConfiguration();
  
              cc.setCacheMode(mode);
 -            cc.setDistributionMode(PARTITIONED_ONLY);
              cc.setEvictionPolicy(new CacheLruEvictionPolicy(1000));
              cc.setEvictSynchronized(false);
 -            cc.setEvictNearSynchronized(false);
              cc.setSwapEnabled(false);
              cc.setWriteSynchronizationMode(FULL_SYNC);
-             cc.setPreloadMode(NONE);
+             cc.setRebalanceMode(NONE);
  
              c.setCacheConfiguration(cc);
          }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
index 16ebacd,d412e77..24b69ec
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java
@@@ -46,8 -46,9 +46,8 @@@ import java.util.*
  import java.util.concurrent.*;
  
  import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  import static org.apache.ignite.configuration.DeploymentMode.*;
  

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentOffHeapSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java
index 302d070,83a23b7..bf56659
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentSelfTest.java
@@@ -31,8 -31,9 +31,8 @@@ import org.apache.ignite.testframework.
  import java.util.*;
  
  import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  import static org.apache.ignite.configuration.DeploymentMode.*;
  
@@@ -100,9 -101,9 +100,9 @@@ public class GridCacheDeploymentSelfTes
  
          cfg.setCacheMode(PARTITIONED);
          cfg.setWriteSynchronizationMode(FULL_SYNC);
-         cfg.setPreloadMode(SYNC);
+         cfg.setRebalanceMode(SYNC);
          cfg.setAtomicityMode(TRANSACTIONAL);
 -        cfg.setDistributionMode(NEAR_PARTITIONED);
 +        cfg.setNearConfiguration(new NearCacheConfiguration());
          cfg.setBackups(1);
  
          return cfg;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
index 3098a26,d890da9..858de64
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGetAndTransformStoreAbstractTest.java
@@@ -33,7 -33,8 +33,7 @@@ import java.util.concurrent.*
  import java.util.concurrent.atomic.*;
  
  import static org.apache.ignite.cache.CacheAtomicityMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  
  /**
@@@ -85,7 -86,8 +85,7 @@@ public abstract class GridCacheGetAndTr
          cc.setWriteSynchronizationMode(FULL_SYNC);
          cc.setSwapEnabled(false);
          cc.setAtomicityMode(atomicityMode());
-         cc.setPreloadMode(SYNC);
 -        cc.setDistributionMode(distributionMode());
+         cc.setRebalanceMode(SYNC);
  
          cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
          cc.setReadThrough(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
index 10afc73,6950d5c..0680454
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java
@@@ -34,8 -34,9 +34,8 @@@ import java.util.concurrent.atomic.*
  
  import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
  import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  
  /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java
index f274f36,d21d20c..a928ea3
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java
@@@ -71,9 -72,8 +71,8 @@@ public class GridCacheKeyCheckSelfTest 
  
          cfg.setCacheMode(PARTITIONED);
          cfg.setBackups(1);
 -        cfg.setDistributionMode(distributionMode());
 +        cfg.setNearConfiguration(nearConfiguration());
          cfg.setWriteSynchronizationMode(FULL_SYNC);
-         cfg.setQueryIndexEnabled(false);
          cfg.setAtomicityMode(atomicityMode);
  
          return cfg;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
index 0f8984c,2cf509e..a03798e
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java
@@@ -75,9 -76,8 +75,8 @@@ public class GridCacheLeakTest extends 
  
          cfg.setCacheMode(PARTITIONED);
          cfg.setBackups(1);
 -        cfg.setDistributionMode(PARTITIONED_ONLY);
 +        cfg.setNearConfiguration(null);
          cfg.setWriteSynchronizationMode(FULL_SYNC);
-         cfg.setQueryIndexEnabled(false);
          cfg.setAtomicityMode(atomicityMode);
  
          return cfg;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java
index ce6153b,0fcbaef..d360df5
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java
@@@ -33,8 -32,9 +33,8 @@@ import org.apache.ignite.transactions.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.*;
  
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  import static org.apache.ignite.transactions.TransactionConcurrency.*;
  import static org.apache.ignite.transactions.TransactionIsolation.*;
@@@ -74,12 -74,10 +74,12 @@@ public class GridCacheMultiUpdateLockSe
  
          cfg.setCacheMode(PARTITIONED);
          cfg.setBackups(1);
 -        cfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
 +
 +        if (!nearEnabled)
 +            cfg.setNearConfiguration(null);
  
          cfg.setWriteSynchronizationMode(FULL_SYNC);
-         cfg.setPreloadMode(SYNC);
+         cfg.setRebalanceMode(SYNC);
  
          return cfg;
      }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java
index 690d4e5,bce17b7..52076cd
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java
@@@ -72,8 -73,7 +72,7 @@@ public class GridCacheOffHeapTest exten
          cacheCfg.setWriteSynchronizationMode(FULL_ASYNC);
          cacheCfg.setSwapEnabled(false);
          cacheCfg.setCacheMode(mode);
-         cacheCfg.setQueryIndexEnabled(false);
 -        cacheCfg.setDistributionMode(PARTITIONED_ONLY);
 +        cacheCfg.setNearConfiguration(null);
          cacheCfg.setStartSize(startSize);
  
          if (onheap > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c82d4d8e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
index 8c0743a,7b429b2..355893c
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java
@@@ -35,8 -35,9 +35,8 @@@ import org.apache.ignite.testframework.
  import java.util.concurrent.atomic.*;
  
  import static org.apache.ignite.cache.CacheAtomicityMode.*;
 -import static org.apache.ignite.cache.CacheDistributionMode.*;
  import static org.apache.ignite.cache.CacheMode.*;
- import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheRebalanceMode.*;
  import static org.apache.ignite.configuration.DeploymentMode.*;
  
  /**
@@@ -95,11 -95,12 +94,10 @@@ public class GridCacheP2PUndeploySelfTe
  
          partCacheCfg.setName("partitioned");
          partCacheCfg.setCacheMode(PARTITIONED);
-         partCacheCfg.setPreloadMode(mode);
+         partCacheCfg.setRebalanceMode(mode);
          partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1));
          partCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-         partCacheCfg.setQueryIndexEnabled(false);
 -        partCacheCfg.setEvictNearSynchronized(false);
          partCacheCfg.setAtomicityMode(TRANSACTIONAL);
 -        partCacheCfg.setDistributionMode(NEAR_PARTITIONED);
  
          if (offheap)
              partCacheCfg.setOffHeapMaxMemory(OFFHEAP);


Mime
View raw message