ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/17] ignite git commit: IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at unstable topology in !allowOverwrite mode fixed)
Date Fri, 11 Nov 2016 14:18:54 GMT
IGNITE-500 CacheLoadingConcurrentGridStartSelfTest fails (DataStreamer data loss at unstable topology in !allowOverwrite mode fixed)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7499828
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7499828
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7499828

Branch: refs/heads/ignite-4154-opt2
Commit: b7499828c928e02e8e554f960f3754e4d08bfbe0
Parents: 8b59f4e
Author: Anton Vinogradov <av@apache.org>
Authored: Thu Nov 10 16:10:21 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Thu Nov 10 16:10:21 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteDataStreamer.java   |   2 +-
 .../processors/cache/GridCacheMapEntry.java     |   5 +-
 .../processors/cache/GridCacheMvccManager.java  |  77 +++
 .../GridCachePartitionExchangeManager.java      |   5 +
 .../cache/GridCacheSharedContext.java           |   1 +
 .../datastreamer/DataStreamProcessor.java       | 104 +++-
 .../datastreamer/DataStreamerImpl.java          | 603 ++++++++++++++-----
 .../ignite/internal/util/GridLogThrottle.java   |  29 +-
 .../cache/IgniteCacheDynamicStopSelfTest.java   |  48 +-
 ...CacheLoadingConcurrentGridStartSelfTest.java | 251 +++++++-
 ...ncurrentGridStartSelfTestAllowOverwrite.java |  30 +
 .../DataStreamProcessorSelfTest.java            |   4 +-
 .../datastreamer/DataStreamerImplSelfTest.java  | 170 ++++--
 .../DataStreamerMultiThreadedSelfTest.java      |   2 -
 .../datastreamer/DataStreamerTimeoutTest.java   |  92 ++-
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 16 files changed, 1120 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 484fee9..4e00d66 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -291,7 +291,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
      */
-    public IgniteFuture<?> removeData(K key)  throws CacheException, IgniteInterruptedException, IllegalStateException;
+    public IgniteFuture<?> removeData(K key) throws CacheException, IgniteInterruptedException, IllegalStateException;
 
     /**
      * Adds data for streaming on remote node. This method can be called from multiple

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5996672..950153f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3444,11 +3444,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 if (val == null) {
                     skipQryNtf = true;
 
-                    if (cctx.deferredDelete() && !isInternal()) {
-                        assert !deletedUnlocked();
-
+                    if (cctx.deferredDelete() && !deletedUnlocked() && !isInternal())
                         deletedUnlocked(true);
-                    }
                 }
                 else if (deletedUnlocked())
                     deletedUnlocked(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c4db01e..c57e17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -108,6 +109,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
         new ConcurrentHashMap8<>();
 
+    /** Pending data streamer futures. */
+    private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
+
     /** */
     private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>();
 
@@ -446,6 +450,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @return Collection of pending data streamer futures.
+     */
+    public Collection<DataStreamerFuture> dataStreamerFutures() {
+        return dataStreamerFuts;
+    }
+
+    /**
      * Gets future by given future ID.
      *
      * @param futVer Future ID.
@@ -476,6 +487,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param topVer Topology version.
+     */
+    public GridFutureAdapter addDataStreamerFuture(AffinityTopologyVersion topVer) {
+        final DataStreamerFuture fut = new DataStreamerFuture(topVer);
+
+        boolean add = dataStreamerFuts.add(fut);
+
+        assert add;
+
+        return fut;
+    }
+
+    /**
+
+    /**
      * Adds future.
      *
      * @param fut Future.
@@ -1056,6 +1082,22 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     *
+     * @return Finish update future.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<?> finishDataStreamerUpdates() {
+        GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
+
+        for (IgniteInternalFuture fut : dataStreamerFuts)
+            res.add(fut);
+
+        res.markInitialized();
+
+        return res;
+    }
+
+    /**
      * @param keys Key for which locks should be released.
      * @param cacheId Cache ID.
      * @param topVer Topology version.
@@ -1294,4 +1336,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
                 CachePartialUpdateCheckedException.class.isAssignableFrom(cls);
         }
     }
+
+    /**
+     *
+     */
+    private class DataStreamerFuture extends GridFutureAdapter<Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Topology version. Instance field for toString method only. */
+        @GridToStringInclude
+        private final AffinityTopologyVersion topVer;
+
+        /**
+         * @param topVer Topology version.
+         */
+        DataStreamerFuture(AffinityTopologyVersion topVer) {
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                dataStreamerFuts.remove(this);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStreamerFuture.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a901e2a..00d2d16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1309,6 +1309,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             for (GridCacheFuture<?> fut : mvcc.atomicFutures())
                 U.warn(log, ">>> " + fut);
 
+            U.warn(log, "Pending data streamer futures:");
+
+            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
+                U.warn(log, ">>> " + fut);
+
             if (tm != null) {
                 U.warn(log, "Pending transaction deadlock detection futures:");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 8f39235..117a5c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -636,6 +636,7 @@ public class GridCacheSharedContext<K, V> {
         f.add(mvcc().finishExplicitLocks(topVer));
         f.add(tm().finishTxs(topVer));
         f.add(mvcc().finishAtomicUpdates(topVer));
+        f.add(mvcc().finishDataStreamerUpdates());
 
         f.markInitialized();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 7663735..32fda87 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.DelayQueue;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -29,13 +30,18 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.stream.StreamReceiver;
 import org.apache.ignite.thread.IgniteThread;
@@ -288,32 +294,94 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
                 return;
             }
 
-            Collection<DataStreamerEntry> col = req.entries();
+            localUpdate(nodeId, req, updater, topic);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
 
-            DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
-                log,
-                req.cacheName(),
-                col,
-                req.ignoreDeploymentOwnership(),
-                req.skipStore(),
-                req.keepBinary(),
-                updater);
+    /**
+     * @param nodeId Node id.
+     * @param req Request.
+     * @param updater Updater.
+     * @param topic Topic.
+     */
+    private void localUpdate(final UUID nodeId,
+        final DataStreamerRequest req,
+        final StreamReceiver<K, V> updater,
+        final Object topic) {
+        final boolean allowOverwrite = !(updater instanceof DataStreamerImpl.IsolatedUpdater);
 
-            Exception err = null;
+        try {
+            GridCacheAdapter cache = ctx.cache().internalCache(req.cacheName());
+
+            if (cache == null)
+                throw new IgniteCheckedException("Cache not created or already destroyed.");
+
+            GridCacheContext cctx = cache.context();
+
+            DataStreamerUpdateJob job = null;
+
+            GridFutureAdapter waitFut = null;
+
+            if (!allowOverwrite)
+                cctx.topology().readLock();
 
             try {
-                job.call();
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to finish update job.", e);
+                GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
 
-                err = e;
+                AffinityTopologyVersion topVer = fut.topologyVersion();
+
+                if (!allowOverwrite && !topVer.equals(req.topologyVersion())) {
+                    Exception err = new IgniteCheckedException(
+                        "DataStreamer will retry data transfer at stable topology " +
+                            "[reqTop=" + req.topologyVersion() + ", topVer=" + topVer + ", node=remote]");
+
+                    sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+                }
+                else if (allowOverwrite || fut.isDone()) {
+                    job = new DataStreamerUpdateJob(ctx,
+                        log,
+                        req.cacheName(),
+                        req.entries(),
+                        req.ignoreDeploymentOwnership(),
+                        req.skipStore(),
+                        req.keepBinary(),
+                        updater);
+
+                    waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer);
+                }
+                else {
+                    fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                        @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                            localUpdate(nodeId, req, updater, topic);
+                        }
+                    });
+                }
+            }
+            finally {
+                if (!allowOverwrite)
+                    cctx.topology().readUnlock();
             }
 
-            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+            if (job != null) {
+                try {
+                    job.call();
+
+                    sendResponse(nodeId, topic, req.requestId(), null, req.forceLocalDeployment());
+                }
+                finally {
+                    if (waitFut != null)
+                        waitFut.onDone();
+                }
+            }
         }
-        finally {
-            busyLock.leaveBusy();
+        catch (Throwable e) {
+            sendResponse(nodeId, topic, req.requestId(), e, req.forceLocalDeployment());
+
+            if (e instanceof Error)
+                throw (Error)e;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index a6065dd..443783b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
@@ -39,15 +40,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.CacheException;
 import javax.cache.expiry.ExpiryPolicy;
-
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteDataStreamerTimeoutException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteDataStreamerTimeoutException;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -72,11 +73,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheGateway;
 import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.processors.dr.GridDrType;
@@ -92,6 +95,8 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
@@ -102,6 +107,7 @@ import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.stream.StreamReceiver;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -113,12 +119,15 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUB
  */
 @SuppressWarnings("unchecked")
 public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
-    /** Default policy reoslver. */
+    /** Default policy resolver. */
     private static final DefaultIoPolicyResolver DFLT_IO_PLC_RSLVR = new DefaultIoPolicyResolver();
 
     /** Isolated receiver. */
     private static final StreamReceiver ISOLATED_UPDATER = new IsolatedUpdater();
 
+    /** Amount of permissions should be available to continue new data processing. */
+    private static final int REMAP_SEMAPHORE_PERMISSIONS_COUNT = Integer.MAX_VALUE;
+
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;
 
@@ -178,6 +187,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** {@code True} if data loader has been cancelled. */
     private volatile boolean cancelled;
 
+    /** Fail counter. */
+    private final LongAdder8 failCntr = new LongAdder8();
+
     /** Active futures of this data loader. */
     @GridToStringInclude
     private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
@@ -189,6 +201,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             boolean rmv = activeFuts.remove(t);
 
             assert rmv;
+
+            Throwable err = t.error();
+
+            if (err != null && !(err instanceof IgniteClientDisconnectedCheckedException)) {
+                LT.error(log, t.error(), "DataStreamer operation failed.", true);
+
+                failCntr.increment();
+
+                cancelled = true;
+            }
         }
     };
 
@@ -231,6 +253,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */
     private static boolean isWarningPrinted;
 
+    /** Allows to pause new data processing while failed data processing in progress. */
+    private final Semaphore remapSem = new Semaphore(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+
+    /** */
+    private final ConcurrentLinkedDeque<Runnable> dataToRemap = new ConcurrentLinkedDeque<>();
+
+    /** */
+    private final AtomicBoolean remapOwning = new AtomicBoolean();
+
     /**
      * @param ctx Grid kernal context.
      * @param cacheName Cache name.
@@ -301,7 +332,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 Buffer buf = bufMappings.get(nodeId);
 
                 if (buf != null)
-                    buf.onResponse(res);
+                    buf.onResponse(res, nodeId);
 
                 else if (log.isDebugEnabled())
                     log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
@@ -314,6 +345,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         fut = new DataStreamerFuture(this);
 
         publicFut = new IgniteCacheFutureImpl<>(fut);
+
+        GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
+
+        if (cache == null) { // Possible, cache is not configured on node.
+            assert ccfg != null;
+
+            if (ccfg.getCacheMode() == CacheMode.LOCAL)
+                throw new CacheException("Impossible to load Local cache configured remotely.");
+
+            ctx.grid().getOrCreateCache(ccfg);
+        }
     }
 
     /**
@@ -358,6 +400,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             throw new IllegalStateException("Data streamer has been closed.");
         }
+        else if (cancelled) {
+            busyLock.leaveBusy();
+
+            throw new IllegalStateException("Data streamer has been closed.");
+        }
     }
 
     /**
@@ -633,6 +680,37 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     *
+     */
+    private void acquireRemapSemaphore() throws IgniteInterruptedCheckedException {
+        try {
+            if (remapSem.availablePermits() != REMAP_SEMAPHORE_PERMISSIONS_COUNT) {
+                if (timeout == DFLT_UNLIMIT_TIMEOUT) {
+                    // Wait until failed data being processed.
+                    remapSem.acquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+
+                    remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+                }
+                else {
+                    // Wait until failed data being processed.
+                    boolean res = remapSem.tryAcquire(REMAP_SEMAPHORE_PERMISSIONS_COUNT, timeout, TimeUnit.MILLISECONDS);
+
+                    if (res)
+                        remapSem.release(REMAP_SEMAPHORE_PERMISSIONS_COUNT);
+                    else
+                        throw new IgniteDataStreamerTimeoutException("Data streamer exceeded timeout " +
+                            "while was waiting for failed data resending finished.");
+                }
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+
+    /**
      * @param entries Entries.
      * @param resFut Result future.
      * @param activeKeys Active keys.
@@ -644,170 +722,266 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         @Nullable final Collection<KeyCacheObjectWrapper> activeKeys,
         final int remaps
     ) {
-        assert entries != null;
+        try {
+            assert entries != null;
 
-        if (!isWarningPrinted) {
-            synchronized (this) {
-                if (!allowOverwrite() && !isWarningPrinted) {
-                    U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
-                        "(to change, set allowOverwrite to true)");
-                }
+            final boolean remap = remaps > 0;
 
-                isWarningPrinted = true;
+            if (!remap) { // Failed data should be processed prior to new data.
+                acquireRemapSemaphore();
             }
-        }
 
-        Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
+            if (!isWarningPrinted) {
+                synchronized (this) {
+                    if (!allowOverwrite() && !isWarningPrinted) {
+                        U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
+                            "(to change, set allowOverwrite to true)");
+                    }
 
-        boolean initPda = ctx.deploy().enabled() && jobPda == null;
+                    isWarningPrinted = true;
+                }
+            }
 
-        AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion();
+            Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
 
-        for (DataStreamerEntry entry : entries) {
-            List<ClusterNode> nodes;
+            boolean initPda = ctx.deploy().enabled() && jobPda == null;
 
-            try {
-                KeyCacheObject key = entry.getKey();
+            GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
 
-                assert key != null;
+            if (cache == null)
+                throw new IgniteCheckedException("Cache not created or already destroyed.");
 
-                if (initPda) {
-                    if (cacheObjCtx.addDeploymentInfo())
-                        jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
-                            entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
-                            rcvr);
-                    else if (rcvr != null)
-                        jobPda = new DataStreamerPda(rcvr);
+            GridCacheContext cctx = cache.context();
 
-                    initPda = false;
-                }
+            GridCacheGateway gate = null;
 
-                nodes = nodes(key, topVer);
-            }
-            catch (IgniteCheckedException e) {
-                resFut.onDone(e);
+            if (!allowOverwrite() && !cctx.isLocal()) { // Cases where cctx required.
+                gate = cctx.gate();
 
-                return;
+                gate.enter();
             }
 
-            if (F.isEmpty(nodes)) {
-                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
-                    "(no nodes with cache found in topology) [infos=" + entries.size() +
-                    ", cacheName=" + cacheName + ']'));
-
-                return;
-            }
+            try {
+                AffinityTopologyVersion topVer = allowOverwrite() || cctx.isLocal() ?
+                        ctx.cache().context().exchange().readyAffinityVersion() :
+                        cctx.topology().topologyVersion();
 
-            for (ClusterNode node : nodes) {
-                Collection<DataStreamerEntry> col = mappings.get(node);
+                for (DataStreamerEntry entry : entries) {
+                    List<ClusterNode> nodes;
 
-                if (col == null)
-                    mappings.put(node, col = new ArrayList<>());
+                    try {
+                        KeyCacheObject key = entry.getKey();
 
-                col.add(entry);
-            }
-        }
+                        assert key != null;
 
-        for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
-            final UUID nodeId = e.getKey().id();
+                        if (initPda) {
+                            if (cacheObjCtx.addDeploymentInfo())
+                                jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
+                                    entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
+                                    rcvr);
+                            else if (rcvr != null)
+                                jobPda = new DataStreamerPda(rcvr);
 
-            Buffer buf = bufMappings.get(nodeId);
+                            initPda = false;
+                        }
 
-            if (buf == null) {
-                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+                        nodes = nodes(key, topVer, cctx);
+                    }
+                    catch (IgniteCheckedException e) {
+                        resFut.onDone(e);
 
-                if (old != null)
-                    buf = old;
-            }
+                        return;
+                    }
 
-            final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+                    if (F.isEmpty(nodes)) {
+                        resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                            "(no nodes with cache found in topology) [infos=" + entries.size() +
+                            ", cacheName=" + cacheName + ']'));
 
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    try {
-                        t.get();
+                        return;
+                    }
 
-                        if (activeKeys != null) {
-                            for (DataStreamerEntry e : entriesForNode)
-                                activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
+                    for (ClusterNode node : nodes) {
+                        Collection<DataStreamerEntry> col = mappings.get(node);
 
-                            if (activeKeys.isEmpty())
-                                resFut.onDone();
-                        }
-                        else {
-                            assert entriesForNode.size() == 1;
+                        if (col == null)
+                            mappings.put(node, col = new ArrayList<>());
 
-                            // That has been a single key,
-                            // so complete result future right away.
-                            resFut.onDone();
-                        }
+                        col.add(entry);
                     }
-                    catch (IgniteClientDisconnectedCheckedException e1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+                }
 
-                        resFut.onDone(e1);
+                for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
+                    final UUID nodeId = e.getKey().id();
+
+                    Buffer buf = bufMappings.get(nodeId);
+
+                    if (buf == null) {
+                        Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+                        if (old != null)
+                            buf = old;
                     }
-                    catch (IgniteCheckedException e1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
 
-                        if (cancelled) {
-                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
-                                DataStreamerImpl.this, e1));
+                    final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+
+                    IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) {
+                            try {
+                                t.get();
+
+                                if (activeKeys != null) {
+                                    for (DataStreamerEntry e : entriesForNode)
+                                        activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));
+
+                                    if (activeKeys.isEmpty())
+                                        resFut.onDone();
+                                }
+                                else {
+                                    assert entriesForNode.size() == 1;
+
+                                    // That has been a single key,
+                                    // so complete result future right away.
+                                    resFut.onDone();
+                                }
+                            }
+                            catch (IgniteClientDisconnectedCheckedException e1) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                                resFut.onDone(e1);
+                            }
+                            catch (IgniteCheckedException e1) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                                if (cancelled) {
+                                    resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                                        DataStreamerImpl.this, e1));
+                                }
+                                else if (remaps + 1 > maxRemapCnt) {
+                                    resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+                                        + remaps, e1));
+                                }
+                                else {
+                                    try {
+                                        remapSem.acquire();
+
+                                        final Runnable r = new Runnable() {
+                                            @Override public void run() {
+                                                try {
+                                                    if (cancelled)
+                                                        throw new IllegalStateException("DataStreamer closed.");
+
+                                                    load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                                                }
+                                                catch (Throwable ex) {
+                                                    resFut.onDone(
+                                                        new IgniteCheckedException("DataStreamer remapping failed. ", ex));
+                                                }
+                                                finally {
+                                                    remapSem.release();
+                                                }
+                                            }
+                                        };
+
+                                        dataToRemap.add(r);
+
+                                        if (!remapOwning.get() && remapOwning.compareAndSet(false, true)) {
+                                            ctx.closure().callLocalSafe(new GPC<Boolean>() {
+                                                @Override public Boolean call() {
+                                                    boolean locked = true;
+
+                                                    while (locked || !dataToRemap.isEmpty()) {
+                                                        if (!locked && !remapOwning.compareAndSet(false, true))
+                                                            return false;
+
+                                                        try {
+                                                            Runnable r = dataToRemap.poll();
+
+                                                            if (r != null)
+                                                                r.run();
+                                                        }
+                                                        finally {
+                                                            if (!dataToRemap.isEmpty())
+                                                                locked = true;
+                                                            else {
+                                                                remapOwning.set(false);
+
+                                                                locked = false;
+                                                            }
+                                                        }
+                                                    }
+
+                                                    return true;
+                                                }
+                                            }, true);
+                                        }
+                                    }
+                                    catch (InterruptedException e2) {
+                                        resFut.onDone(e2);
+                                    }
+                                }
+                            }
                         }
-                        else if (remaps + 1 > maxRemapCnt) {
-                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
-                                + remaps), e1);
-                        }
-                        else
-                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
-                    }
-                }
-            };
+                    };
 
-            final GridFutureAdapter<?> f;
+                    final GridFutureAdapter<?> f;
 
-            try {
-                f = buf.update(entriesForNode, topVer, lsnr);
-            }
-            catch (IgniteInterruptedCheckedException e1) {
-                resFut.onDone(e1);
+                    try {
+                        f = buf.update(entriesForNode, topVer, lsnr, remap);
+                    }
+                    catch (IgniteInterruptedCheckedException e1) {
+                        resFut.onDone(e1);
 
-                return;
-            }
+                        return;
+                    }
 
-            if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf)) {
-                    final Buffer buf0 = buf;
+                    if (ctx.discovery().node(nodeId) == null) {
+                        if (bufMappings.remove(nodeId, buf)) {
+                            final Buffer buf0 = buf;
 
-                    waitAffinityAndRun(new Runnable() {
-                        @Override public void run() {
-                            buf0.onNodeLeft();
+                            waitAffinityAndRun(new Runnable() {
+                                @Override public void run() {
+                                    buf0.onNodeLeft();
 
-                            if (f != null)
-                                f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                                    "(node has left): " + nodeId));
+                                    if (f != null)
+                                        f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                                            "(node has left): " + nodeId));
+                                }
+                            }, ctx.discovery().topologyVersion(), false);
                         }
-                    }, ctx.discovery().topologyVersion(), false);
+                    }
                 }
             }
+            finally {
+                if (gate != null)
+                    gate.leave();
+            }
+        }
+        catch (Exception ex) {
+            resFut.onDone(new IgniteCheckedException("DataStreamer data loading failed.", ex));
         }
     }
 
     /**
      * @param key Key to map.
      * @param topVer Topology version.
+     * @param cctx Context.
      * @return Nodes to send requests to.
      * @throws IgniteCheckedException If failed.
      */
-    private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
+    private List<ClusterNode> nodes(KeyCacheObject key,
+        AffinityTopologyVersion topVer,
+        GridCacheContext cctx) throws IgniteCheckedException {
         GridAffinityProcessor aff = ctx.affinity();
 
         List<ClusterNode> res = null;
 
         if (!allowOverwrite())
-            res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer);
+            res = cctx.isLocal() ?
+                aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer) :
+                cctx.topology().nodes(cctx.affinity().partition(key), topVer);
         else {
             ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer);
 
@@ -992,7 +1166,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
      * @throws IgniteCheckedException If failed.
      */
     public void closeEx(boolean cancel) throws IgniteCheckedException {
-        closeEx(cancel, null);
+        IgniteCheckedException err = closeEx(cancel, null);
+
+        if (err != null)
+            throw err; // Throws at close().
     }
 
     /**
@@ -1000,9 +1177,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
      * @param err Error.
      * @throws IgniteCheckedException If failed.
      */
-    public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
+    private IgniteCheckedException closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
         if (!closed.compareAndSet(false, true))
-            return;
+            return null;
 
         busyLock.block();
 
@@ -1029,7 +1206,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             throw e;
         }
 
+        long failed = failCntr.longValue();
+
+        if (failed > 0 && err == null)
+            err = new IgniteCheckedException("Some of DataStreamer operations failed [failedCount=" + failed + "]");
+
         fut.onDone(err);
+
+        return err;
     }
 
     /**
@@ -1139,6 +1323,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         /** */
         private final Semaphore sem;
 
+        /** Batch topology. */
+        private AffinityTopologyVersion batchTopVer;
+
         /** Closure to signal on task finish. */
         @GridToStringExclude
         private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
@@ -1169,37 +1356,64 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         }
 
         /**
+         * @param remap Remapping flag.
+         */
+        private void renewBatch(boolean remap) {
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>();
+
+            batchTopVer = null;
+
+            if (!remap)
+                curFut.listen(signalC);
+        }
+
+        /**
          * @param newEntries Infos.
          * @param topVer Topology version.
          * @param lsnr Listener for the operation future.
+         * @param remap Remapping flag.
          * @return Future for operation.
          * @throws IgniteInterruptedCheckedException If failed.
          */
         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
             AffinityTopologyVersion topVer,
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr,
+            boolean remap) throws IgniteInterruptedCheckedException {
             List<DataStreamerEntry> entries0 = null;
+
             GridFutureAdapter<Object> curFut0;
 
+            AffinityTopologyVersion curBatchTopVer;
+
             synchronized (this) {
                 curFut0 = curFut;
 
                 curFut0.listen(lsnr);
 
+                if (batchTopVer == null)
+                    batchTopVer = topVer;
+
+                curBatchTopVer = batchTopVer;
+
                 for (DataStreamerEntry entry : newEntries)
                     entries.add(entry);
 
                 if (entries.size() >= bufSize) {
                     entries0 = entries;
 
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>();
-                    curFut.listen(signalC);
+                    renewBatch(remap);
                 }
             }
 
-            if (entries0 != null) {
-                submit(entries0, topVer, curFut0);
+            if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) {
+                renewBatch(remap);
+
+                curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." +
+                    "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]"));
+            }
+            else if (entries0 != null) {
+                submit(entries0, curBatchTopVer, curFut0, remap);
 
                 if (cancelled)
                     curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
@@ -1227,6 +1441,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             List<DataStreamerEntry> entries0 = null;
             GridFutureAdapter<Object> curFut0 = null;
 
+            acquireRemapSemaphore();
+
             synchronized (this) {
                 if (!entries.isEmpty()) {
                     entries0 = entries;
@@ -1239,7 +1455,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (entries0 != null)
-                submit(entries0, null, curFut0);
+                submit(entries0, batchTopVer, curFut0, false);
 
             // Create compound future for this flush.
             GridCompoundFuture<Object, Object> res = null;
@@ -1290,25 +1506,113 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         }
 
         /**
+         * @param entries Entries.
+         * @param reqTopVer Request topology version.
+         * @param curFut Current future.
+         */
+        private void localUpdate(final Collection<DataStreamerEntry> entries,
+            final AffinityTopologyVersion reqTopVer,
+            final GridFutureAdapter<Object> curFut) {
+            try {
+                GridCacheContext cctx = ctx.cache().internalCache(cacheName).context();
+
+                final boolean allowOverwrite = allowOverwrite();
+                final boolean loc = cctx.isLocal();
+
+                if (!loc && !allowOverwrite)
+                    cctx.topology().readLock();
+
+                try {
+                    GridDhtTopologyFuture fut = loc ? null : cctx.topologyVersionFuture();
+
+                    AffinityTopologyVersion topVer = loc ? reqTopVer : fut.topologyVersion();
+
+                    if (!allowOverwrite && !topVer.equals(reqTopVer)) {
+                        curFut.onDone(new IgniteCheckedException(
+                            "DataStreamer will retry data transfer at stable topology. " +
+                                "[reqTop=" + reqTopVer + " ,topVer=" + topVer + ", node=local]"));
+                    }
+                    else if (loc || allowOverwrite || fut.isDone()) {
+                        IgniteInternalFuture<Object> callFut = ctx.closure().callLocalSafe(
+                            new DataStreamerUpdateJob(
+                                ctx,
+                                log,
+                                cacheName,
+                                entries,
+                                false,
+                                skipStore,
+                                keepBinary,
+                                rcvr),
+                            false);
+
+                        locFuts.add(callFut);
+
+                        final GridFutureAdapter waitFut = (loc || allowOverwrite) ?
+                            null :
+                            cctx.mvcc().addDataStreamerFuture(topVer);
+
+                        callFut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                            @Override public void apply(IgniteInternalFuture<Object> t) {
+                                try {
+                                    boolean rmv = locFuts.remove(t);
+
+                                    assert rmv;
+
+                                    curFut.onDone(t.get());
+                                }
+                                catch (IgniteCheckedException e) {
+                                    curFut.onDone(e);
+                                }
+                                finally {
+                                    if (waitFut != null)
+                                        waitFut.onDone();
+                                }
+                            }
+                        });
+                    }
+                    else {
+                        fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                            @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) {
+                                localUpdate(entries, reqTopVer, curFut);
+                            }
+                        });
+                    }
+                }
+                finally {
+                    if (!loc && !allowOverwrite)
+                        cctx.topology().readUnlock();
+                }
+            }
+            catch (Throwable ex) {
+                curFut.onDone(new IgniteCheckedException("DataStreamer data handling failed.", ex));
+            }
+        }
+
+        /**
          * @param entries Entries to submit.
          * @param topVer Topology version.
          * @param curFut Current future.
+         * @param remap Remapping flag.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
         private void submit(final Collection<DataStreamerEntry> entries,
             @Nullable AffinityTopologyVersion topVer,
-            final GridFutureAdapter<Object> curFut)
+            final GridFutureAdapter<Object> curFut,
+            boolean remap)
             throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
             assert curFut != null;
 
-            try {
-                incrementActiveTasks();
-            }
-            catch (IgniteDataStreamerTimeoutException e) {
-                curFut.onDone(e);
-                throw e;
+            if (!remap) {
+                try {
+                    incrementActiveTasks();
+                }
+                catch (IgniteDataStreamerTimeoutException e) {
+                    curFut.onDone(e);
+
+                    throw e;
+                }
             }
 
             IgniteInternalFuture<Object> fut;
@@ -1318,27 +1622,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             if (plc == null)
                 plc = PUBLIC_POOL;
 
-            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL) {
-                fut = ctx.closure().callLocalSafe(
-                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, keepBinary, rcvr), false);
-
-                locFuts.add(fut);
-
-                fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
-                    @Override public void apply(IgniteInternalFuture<Object> t) {
-                        try {
-                            boolean rmv = locFuts.remove(t);
-
-                            assert rmv;
-
-                            curFut.onDone(t.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            curFut.onDone(e);
-                        }
-                    }
-                });
-            }
+            if (isLocNode && plc == GridIoPolicy.PUBLIC_POOL)
+                localUpdate(entries, topVer, curFut);
             else {
                 try {
                     for (DataStreamerEntry e : entries) {
@@ -1466,8 +1751,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /**
          * @param res Response.
+         * @param nodeId Node id.
          */
-        void onResponse(DataStreamerResponse res) {
+        void onResponse(DataStreamerResponse res, UUID nodeId) {
             if (log.isDebugEnabled())
                 log.debug("Received data load response: " + res);
 
@@ -1488,9 +1774,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 try {
                     GridPeerDeployAware jobPda0 = jobPda;
 
-                    err = U.unmarshal(ctx,
-                        errBytes,
-                        U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config()));
+                    err = new IgniteCheckedException("DataStreamer request failed [node=" + nodeId + "]",
+                        (Throwable)U.unmarshal(ctx,
+                            errBytes,
+                            U.resolveClassLoader(jobPda0 != null ? jobPda0.classLoader() : null, ctx.config())));
                 }
                 catch (IgniteCheckedException e) {
                     f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
@@ -1613,7 +1900,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /**
      * Isolated receiver which only loads entry initial value.
      */
-    private static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
+    protected static class IsolatedUpdater implements StreamReceiver<KeyCacheObject, CacheObject>,
         DataStreamerCacheUpdaters.InternalUpdater {
         /** */
         private static final long serialVersionUID = 0L;
@@ -1630,7 +1917,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             GridCacheContext cctx = internalCache.context();
 
-            AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion topVer = cctx.isLocal() ?
+                cctx.affinity().affinityTopologyVersion() :
+                cctx.topology().topologyVersion();
 
             GridCacheVersion ver = cctx.versions().isolatedStreamerVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
index 745619a..ce6783a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLogThrottle.java
@@ -72,7 +72,21 @@ public class GridLogThrottle {
     public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) {
         assert !F.isEmpty(msg);
 
-        log(log, e, msg, null, LogLevel.ERROR, false);
+        log(log, e, msg, null, LogLevel.ERROR, false, false);
+    }
+
+    /**
+     * Logs error if needed.
+     *
+     * @param log Logger.
+     * @param e Error (optional).
+     * @param msg Message.
+     * @param byMessage Errors group by message, not by tuple(error, msg).
+     */
+    public static void error(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean byMessage) {
+        assert !F.isEmpty(msg);
+
+        log(log, e, msg, null, LogLevel.ERROR, false, byMessage);
     }
 
     /**
@@ -85,7 +99,7 @@ public class GridLogThrottle {
     public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg) {
         assert !F.isEmpty(msg);
 
-        log(log, e, msg, null, LogLevel.WARN, false);
+        log(log, e, msg, null, LogLevel.WARN, false, false);
     }
 
     /**
@@ -99,7 +113,7 @@ public class GridLogThrottle {
     public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String msg, boolean quite) {
         assert !F.isEmpty(msg);
 
-        log(log, e, msg, null, LogLevel.WARN, quite);
+        log(log, e, msg, null, LogLevel.WARN, quite, false);
     }
 
     /**
@@ -113,7 +127,7 @@ public class GridLogThrottle {
     public static void warn(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg) {
         assert !F.isEmpty(longMsg);
 
-        log(log, e, longMsg, shortMsg, LogLevel.WARN, false);
+        log(log, e, longMsg, shortMsg, LogLevel.WARN, false, false);
     }
 
     /**
@@ -126,7 +140,7 @@ public class GridLogThrottle {
     public static void info(@Nullable IgniteLogger log, String msg, boolean quite) {
         assert !F.isEmpty(msg);
 
-        log(log, null, msg, null, LogLevel.INFO, quite);
+        log(log, null, msg, null, LogLevel.INFO, quite, false);
     }
 
     /**
@@ -154,14 +168,15 @@ public class GridLogThrottle {
      * @param longMsg Long message (or just message).
      * @param shortMsg Short message for quite logging.
      * @param level Level where messages should appear.
+     * @param byMessage Errors group by message, not by tuple(error, msg).
      */
     @SuppressWarnings({"RedundantTypeArguments"})
     private static void log(@Nullable IgniteLogger log, @Nullable Throwable e, String longMsg, @Nullable String shortMsg,
-        LogLevel level, boolean quiet) {
+        LogLevel level, boolean quiet, boolean byMessage) {
         assert !F.isEmpty(longMsg);
 
         IgniteBiTuple<Class<? extends Throwable>, String> tup =
-            e != null ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
+            e != null && !byMessage ? F.<Class<? extends Throwable>, String>t(e.getClass(), e.getMessage()) :
                 F.<Class<? extends Throwable>, String>t(null, longMsg);
 
         while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
index 5bd6074..c92ea9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDynamicStopSelfTest.java
@@ -78,27 +78,37 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
         IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
             /** {@inheritDoc} */
             @Override public Object call() throws Exception {
-                try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) {
-                    str.allowOverwrite(allowOverwrite);
-
-                    int i = 0;
-
-                    while (!stop.get()) {
-                        str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() {
-                            @Override public void apply(IgniteFuture<?> f) {
-                                try {
-                                    f.get();
-                                }
-                                catch (CacheException ignore) {
-                                    // This may be debugged.
-                                }
+                while (!stop.get()) {
+                    try (IgniteDataStreamer<Integer, Integer> str = ignite(0).dataStreamer(null)) {
+                        str.allowOverwrite(allowOverwrite);
+
+                        int i = 0;
+
+                        while (!stop.get()) {
+                            try {
+                                str.addData(i % 10_000, i).listen(new CI1<IgniteFuture<?>>() {
+                                    @Override public void apply(IgniteFuture<?> f) {
+                                        try {
+                                            f.get();
+                                        }
+                                        catch (CacheException ignore) {
+                                            // This may be debugged.
+                                        }
+                                    }
+                                });
+                            }
+                            catch (IllegalStateException ignored) {
+                                break;
                             }
-                        });
 
-                        if (i > 0 && i % 10000 == 0)
-                            info("Added: " + i);
+                            if (i > 0 && i % 10000 == 0)
+                                info("Added: " + i);
 
-                        i++;
+                            i++;
+                        }
+                    }
+                    catch (IllegalStateException | CacheException ignored) {
+                        // This may be debugged.
                     }
                 }
 
@@ -114,6 +124,8 @@ public class IgniteCacheDynamicStopSelfTest extends GridCommonAbstractTest {
             Thread.sleep(500);
 
             ignite(0).createCache(ccfg);
+
+            Thread.sleep(1000);
         }
         finally {
             stop.set(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
index 9da6cf7..0801691 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.cache.Cache;
 import javax.cache.configuration.FactoryBuilder;
@@ -28,32 +31,47 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
 /**
  * Tests for cache data loading during simultaneous grids start.
  */
-public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest {
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-500");
-    }
-
+public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractTest implements Serializable {
     /** Grids count */
     private static int GRIDS_CNT = 5;
 
     /** Keys count */
     private static int KEYS_CNT = 1_000_000;
 
+    /** Client. */
+    private volatile boolean client;
+
+    /** Config. */
+    private volatile boolean configured;
+
+    /** Allow override. */
+    protected volatile boolean allowOverwrite;
+
+    /** Restarts. */
+    protected volatile boolean restarts;
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -67,7 +85,24 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
 
         ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestCacheStoreAdapter()));
 
-        cfg.setCacheConfiguration(ccfg);
+        if (getTestGridName(0).equals(gridName)) {
+            if (client)
+                cfg.setClientMode(true);
+
+            if (configured)
+                cfg.setCacheConfiguration(ccfg);
+        }
+        else
+            cfg.setCacheConfiguration(ccfg);
+
+        if (!configured)
+            ccfg.setNodeFilter(new P1<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    String name = node.attribute(ATTR_GRID_NAME).toString();
+
+                    return !getTestGridName(0).equals(name);
+                }
+            });
 
         return cfg;
     }
@@ -81,22 +116,35 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
      * @throws Exception if failed
      */
     public void testLoadCacheWithDataStreamer() throws Exception {
-        IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
-            @Override public void apply(Ignite grid) {
-                try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
-                    for (int i = 0; i < KEYS_CNT; i++)
-                        dataStreamer.addData(i, Integer.toString(i));
+        configured = true;
+
+        try {
+            IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
+                @Override public void apply(Ignite grid) {
+                    try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
+                        dataStreamer.allowOverwrite(allowOverwrite);
+
+                        for (int i = 0; i < KEYS_CNT; i++)
+                            dataStreamer.addData(i, Integer.toString(i));
+                    }
+
+                    log.info("Data loaded.");
                 }
-            }
-        };
+            };
 
-        loadCache(f);
+            loadCache(f);
+        }
+        finally {
+            configured = false;
+        }
     }
 
     /**
      * @throws Exception if failed
      */
     public void testLoadCacheFromStore() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-4210");
+
         loadCache(new IgniteInClosure<Ignite>() {
             @Override public void apply(Ignite grid) {
                 grid.cache(null).loadCache(null);
@@ -105,12 +153,177 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
     }
 
     /**
+     * @throws Exception if failed
+     */
+    public void testLoadCacheWithDataStreamerSequentialClient() throws Exception {
+        client = true;
+
+        try {
+            loadCacheWithDataStreamerSequential();
+        }
+        finally {
+            client = false;
+        }
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testLoadCacheWithDataStreamerSequentialClientWithConfig() throws Exception {
+        client = true;
+        configured = true;
+
+        try {
+            loadCacheWithDataStreamerSequential();
+        }
+        finally {
+            client = false;
+            configured = false;
+        }
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testLoadCacheWithDataStreamerSequential() throws Exception {
+        loadCacheWithDataStreamerSequential();
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testLoadCacheWithDataStreamerSequentialWithConfigAndRestarts() throws Exception {
+        restarts = true;
+        configured = true;
+
+        try {
+            loadCacheWithDataStreamerSequential();
+        }
+        finally {
+            restarts = false;
+            configured = false;
+        }
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testLoadCacheWithDataStreamerSequentialWithConfig() throws Exception {
+        configured = true;
+
+        try {
+            loadCacheWithDataStreamerSequential();
+        }
+        finally {
+            configured = false;
+        }
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    private void loadCacheWithDataStreamerSequential() throws Exception {
+        startGrid(1);
+
+        Ignite g0 = startGrid(0);
+
+        IgniteInternalFuture<Object> restartFut = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                while (restarts) {
+                    stopGrid(1);
+
+                    startGrid(1);
+
+                    U.sleep(100);
+                }
+
+                return null;
+            }
+        });
+
+        IgniteInternalFuture<Object> fut = runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 2; i < GRIDS_CNT; i++)
+                    startGrid(i);
+
+                return null;
+            }
+        });
+
+        final HashSet<IgniteFuture> set = new HashSet<>();
+
+        IgniteInClosure<Ignite> f = new IgniteInClosure<Ignite>() {
+            @Override public void apply(Ignite grid) {
+                try (IgniteDataStreamer<Integer, String> dataStreamer = grid.dataStreamer(null)) {
+                    dataStreamer.allowOverwrite(allowOverwrite);
+
+                    for (int i = 0; i < KEYS_CNT; i++) {
+                        set.add(dataStreamer.addData(i, "Data"));
+
+                        if (i % 100000 == 0)
+                            log.info("Streaming " + i + "'th entry.");
+                    }
+                }
+            }
+        };
+
+        f.apply(g0);
+
+        log.info("Data loaded.");
+
+        restarts = false;
+
+        fut.get();
+        restartFut.get();
+
+        for (IgniteFuture res : set)
+            assertNull(res.get());
+
+        IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+        long size = cache.size(CachePeekMode.PRIMARY);
+
+        if (size != KEYS_CNT) {
+            Set<Integer> failedKeys = new LinkedHashSet<>();
+
+            for (int i = 0; i < KEYS_CNT; i++)
+                if (!cache.containsKey(i)) {
+                    log.info("Actual cache size: " + size);
+
+                    for (Ignite ignite : G.allGrids()) {
+                        IgniteEx igniteEx = (IgniteEx)ignite;
+
+                        log.info("Missed key info:" +
+                            igniteEx.localNode().id() +
+                            " primary=" +
+                            ignite.affinity(null).isPrimary(igniteEx.localNode(), i) +
+                            " backup=" +
+                            ignite.affinity(null).isBackup(igniteEx.localNode(), i) +
+                            " local peek=" +
+                            ignite.cache(null).localPeek(i, CachePeekMode.ONHEAP));
+                    }
+
+                    for (int j = i; j < i + 10000; j++) {
+                        if (!cache.containsKey(j))
+                            failedKeys.add(j);
+                    }
+
+                    break;
+                }
+
+            assert failedKeys.isEmpty() : "Some failed keys: " + failedKeys.toString();
+        }
+
+        assertCacheSize();
+    }
+
+    /**
      * Loads cache using closure and asserts cache size.
      *
      * @param f cache loading closure
      * @throws Exception if failed
      */
-    private void loadCache(IgniteInClosure<Ignite> f) throws Exception {
+    protected void loadCache(IgniteInClosure<Ignite> f) throws Exception {
         Ignite g0 = startGrid(0);
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Ignite>() {
@@ -130,17 +343,17 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT
     }
 
     /** Asserts cache size. */
-    private void assertCacheSize() {
+    protected void assertCacheSize() {
         IgniteCache<Integer, String> cache = grid(0).cache(null);
 
-        assertEquals(KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
+        assertEquals("Data lost.", KEYS_CNT, cache.size(CachePeekMode.PRIMARY));
 
         int total = 0;
 
         for (int i = 0; i < GRIDS_CNT; i++)
             total += grid(i).cache(null).localSize(CachePeekMode.PRIMARY);
 
-        assertEquals(KEYS_CNT, total);
+        assertEquals("Data lost.", KEYS_CNT, total);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
new file mode 100644
index 0000000..c9cd9fa
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTestAllowOverwrite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class CacheLoadingConcurrentGridStartSelfTestAllowOverwrite extends CacheLoadingConcurrentGridStartSelfTest {
+    /**
+     * Default constructor.
+     */
+    public CacheLoadingConcurrentGridStartSelfTestAllowOverwrite() {
+        allowOverwrite = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
index 9fedc35..0f8ae29 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java
@@ -29,9 +29,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
@@ -194,7 +194,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest {
 
             assert false;
         }
-        catch (IgniteCheckedException e) {
+        catch (CacheException e) {
             // Cannot load local cache configured remotely.
             info("Caught expected exception: " + e);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7499828/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
index 0c6686f..a6a9f54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -22,13 +22,17 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -50,6 +54,16 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
     /** Started grid counter. */
     private static int cnt;
 
+    /** No nodes filter. */
+    private static volatile boolean noNodesFilter;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -72,88 +86,149 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
-        try {
-            startGrids(5);
+        startGrids(5);
 
-            final CyclicBarrier barrier = new CyclicBarrier(2);
+        final CyclicBarrier barrier = new CyclicBarrier(2);
 
-            multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    U.awaitQuiet(barrier);
+        multithreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                U.awaitQuiet(barrier);
 
-                    G.stopAll(true);
+                G.stopAll(true);
 
-                    return null;
-                }
-            }, 1);
+                return null;
+            }
+        }, 1);
 
-            Ignite g4 = grid(4);
+        Ignite g4 = grid(4);
 
-            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+        IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
 
-            dataLdr.perNodeBufferSize(32);
+        dataLdr.perNodeBufferSize(32);
 
-            for (int i = 0; i < 100000; i += 2) {
-                dataLdr.addData(i, i);
-                dataLdr.removeData(i + 1);
-            }
+        for (int i = 0; i < 100000; i += 2) {
+            dataLdr.addData(i, i);
+            dataLdr.removeData(i + 1);
+        }
 
-            U.awaitQuiet(barrier);
+        U.awaitQuiet(barrier);
 
-            info("Closing data streamer.");
+        info("Closing data streamer.");
 
-            try {
-                dataLdr.close(true);
-            }
-            catch (IllegalStateException ignore) {
-                // This is ok to ignore this exception as test is racy by it's nature -
-                // grid is stopping in different thread.
-            }
+        try {
+            dataLdr.close(true);
         }
-        finally {
-            G.stopAll(true);
+        catch (CacheException | IllegalStateException ignore) {
+            // This is ok to ignore this exception as test is racy by it's nature -
+            // grid is stopping in different thread.
         }
     }
 
     /**
      * Data streamer should correctly load entries from HashMap in case of grids with more than one node
-     *  and with GridOptimizedMarshaller that requires serializable.
+     * and with GridOptimizedMarshaller that requires serializable.
      *
      * @throws Exception If failed.
      */
     public void testAddDataFromMap() throws Exception {
-        try {
-            cnt = 0;
+        cnt = 0;
 
-            startGrids(2);
+        startGrids(2);
 
-            Ignite g0 = grid(0);
+        Ignite g0 = grid(0);
 
-            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+        IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
 
-            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+        Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
 
-            for (int i = 0; i < KEYS_COUNT; i ++)
-                map.put(i, String.valueOf(i));
+        for (int i = 0; i < KEYS_COUNT; i++)
+            map.put(i, String.valueOf(i));
 
-            dataLdr.addData(map);
+        dataLdr.addData(map);
 
-            dataLdr.close();
+        dataLdr.close();
 
-            Random rnd = new Random();
+        Random rnd = new Random();
 
-            IgniteCache<Integer, String> c = g0.cache(null);
+        IgniteCache<Integer, String> c = g0.cache(null);
 
-            for (int i = 0; i < KEYS_COUNT; i ++) {
-                Integer k = rnd.nextInt(KEYS_COUNT);
+        for (int i = 0; i < KEYS_COUNT; i++) {
+            Integer k = rnd.nextInt(KEYS_COUNT);
 
-                String v = c.get(k);
+            String v = c.get(k);
+
+            assertEquals(k.toString(), v);
+        }
+    }
+
+    /**
+     * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
+     *
+     * @throws Exception If fail.
+     */
+    public void testNoDataNodesOnClose() throws Exception {
+        boolean failed = false;
+
+        cnt = 0;
+
+        noNodesFilter = true;
+
+        try {
+            Ignite ignite = startGrid(1);
 
-                assertEquals(k.toString(), v);
+            try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+                streamer.addData(1, "1");
+            }
+            catch (CacheException ex) {
+                failed = true;
             }
         }
         finally {
-            G.stopAll(true);
+            noNodesFilter = false;
+
+            assertTrue(failed);
+        }
+    }
+
+    /**
+     * Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
+     *
+     * @throws Exception If fail.
+     */
+    public void testNoDataNodesOnFlush() throws Exception {
+        boolean failed = false;
+
+        cnt = 0;
+
+        noNodesFilter = true;
+
+        try {
+            Ignite ignite = startGrid(1);
+
+            IgniteFuture fut = null;
+
+            try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(null)) {
+                fut = streamer.addData(1, "1");
+
+                streamer.flush();
+            }
+            catch (IllegalStateException ex) {
+                try {
+                    fut.get();
+
+                    fail("DataStreamer ignores failed streaming.");
+                }
+                catch (CacheServerNotFoundException ignored) {
+                    // No-op.
+                }
+
+                failed = true;
+            }
+        }
+        finally {
+            noNodesFilter = false;
+
+            assertTrue(failed);
         }
     }
 
@@ -169,6 +244,9 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
         cacheCfg.setBackups(1);
         cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
 
+        if (noNodesFilter)
+            cacheCfg.setNodeFilter(F.alwaysFalse());
+
         return cacheCfg;
     }
 


Mime
View raw message