ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/50] [abbrv] incubator-ignite git commit: renaming wip
Date Thu, 12 Mar 2015 17:21:02 GMT
renaming wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b2c679e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b2c679e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b2c679e5

Branch: refs/heads/ignite-333
Commit: b2c679e5e6e0fcc34cdfdf6250534979251d74ec
Parents: 0c9f8eb
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Wed Mar 11 17:45:58 2015 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Wed Mar 11 17:45:58 2015 +0300

----------------------------------------------------------------------
 .../datagrid/CacheDataStreamerExample.java      |    2 +-
 .../ignite/codegen/MessageCodeGenerator.java    |    2 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |   22 +-
 .../ignite/internal/GridKernalContext.java      |    2 +-
 .../ignite/internal/GridKernalContextImpl.java  |   10 +-
 .../org/apache/ignite/internal/GridTopic.java   |    2 +-
 .../apache/ignite/internal/IgniteKernal.java    |    2 +-
 .../communication/GridIoMessageFactory.java     |    6 +-
 .../processors/cache/GridCacheAdapter.java      |   14 +-
 .../GridDistributedCacheAdapter.java            |    8 +-
 .../version/GridCacheRawVersionedEntry.java     |    2 +-
 .../datastream/DataStreamerCacheUpdaters.java   |  206 +++
 .../datastream/DataStreamerEntry.java           |  170 +++
 .../datastream/DataStreamerFuture.java          |   60 +
 .../processors/datastream/DataStreamerImpl.java | 1405 ++++++++++++++++++
 .../datastream/DataStreamerProcessor.java       |  307 ++++
 .../datastream/DataStreamerRequest.java         |  451 ++++++
 .../datastream/DataStreamerResponse.java        |  166 +++
 .../datastream/DataStreamerUpdateJob.java       |  150 ++
 .../datastream/GridDataLoadRequest.java         |  451 ------
 .../datastream/GridDataLoadResponse.java        |  166 ---
 .../datastream/IgniteDataLoaderEntry.java       |  170 ---
 .../IgniteDataStreamerCacheUpdaters.java        |  206 ---
 .../datastream/IgniteDataStreamerFuture.java    |   69 -
 .../datastream/IgniteDataStreamerImpl.java      | 1405 ------------------
 .../datastream/IgniteDataStreamerProcessor.java |  307 ----
 .../datastream/IgniteDataStreamerUpdateJob.java |  150 --
 .../dr/IgniteDrDataStreamerCacheUpdater.java    |    2 +-
 .../processors/igfs/IgfsDataManager.java        |    7 +-
 ...idCachePartitionedHitsAndMissesSelfTest.java |    2 +-
 .../datastream/DataStreamerImplSelfTest.java    |  205 +++
 .../DataStreamerProcessorSelfTest.java          |  970 ++++++++++++
 .../IgniteDataStreamerImplSelfTest.java         |  205 ---
 .../IgniteDataStreamerPerformanceTest.java      |    2 +-
 .../IgniteDataStreamerProcessorSelfTest.java    |  970 ------------
 .../ignite/testsuites/IgniteCacheTestSuite.java |    5 +-
 pom.xml                                         |    4 +-
 37 files changed, 4138 insertions(+), 4145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
index 73a36a6..7a4e72d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
@@ -66,7 +66,7 @@ public class CacheDataStreamerExample {
             try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) {
                 // Configure loader.
                 stmr.perNodeBufferSize(1024);
-                stmr.perNodeParallelStreamOperations(8);
+                stmr.perNodeParallelOperations(8);
 
                 for (int i = 0; i < ENTRY_COUNT; i++) {
                     stmr.addData(i, Integer.toString(i));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 090417b..07200f5 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -140,7 +140,7 @@ public class MessageCodeGenerator {
 
         MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
 
-        gen.generateAndWrite(IgniteDataLoaderEntry.class);
+        gen.generateAndWrite(DataStreamerEntry.class);
 
 //        gen.generateAndWrite(GridDistributedLockRequest.class);
 //        gen.generateAndWrite(GridDistributedLockResponse.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index c7758fe..c6f28bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -52,7 +52,7 @@ import java.util.*;
  *      value.
  *  </li>
  *  <li>
- *      {@link #perNodeParallelStreamOperations(int)} - sometimes data may be added
+ *      {@link #perNodeParallelOperations(int)} - sometimes data may be added
  *      to the data streamer via {@link #addData(Object, Object)} method faster than it can
  *      be put in cache. In this case, new buffered stream messages are sent to remote nodes
  *      before responses from previous ones are received. This could cause unlimited heap
@@ -102,7 +102,8 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
     public String cacheName();
 
     /**
-     * Gets flag value indicating that this data streamer assumes that there are no other concurrent updates to the cache.
+     * Gets flag value indicating that this data streamer assumes that
+     * there are no other concurrent updates to the cache.
      * Default is {@code false}.
      *
      * @return Flag value.
@@ -110,7 +111,8 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
     public boolean allowOverwrite();
 
     /**
-     * Sets flag indicating that this data streamer should assume that there are no other concurrent updates to the cache.
+     * Sets flag indicating that this data streamer should assume
+     * that there are no other concurrent updates to the cache.
      * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
      * Default is {@code false}. When this flag is set, updates will not be propagated to the cache store.
      *
@@ -154,14 +156,14 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
     public void perNodeBufferSize(int bufSize);
 
     /**
-     * Gets maximum number of parallel stream operations for a single node.
+     * Gets maximum number of parallel update operations for a single node.
      *
      * @return Maximum number of parallel stream operations for a single node.
      */
-    public int perNodeParallelStreamOperations();
+    public int perNodeParallelOperations();
 
     /**
-     * Sets maximum number of parallel stream operations for a single node.
+     * Sets maximum number of parallel update operations for a single node.
      * <p>
      * This method should be called prior to {@link #addData(Object, Object)} call.
      * <p>
@@ -169,7 +171,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      *
      * @param parallelOps Maximum number of parallel stream operations for a single node.
      */
-    public void perNodeParallelStreamOperations(int parallelOps);
+    public void perNodeParallelOperations(int parallelOps);
 
     /**
      * Gets automatic flush frequency. Essentially, this is the time after which the
@@ -284,7 +286,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * @throws IgniteInterruptedException If thread has been interrupted.
      * @throws IllegalStateException If grid has been concurrently stopped or
      *      {@link #close(boolean)} has already been called on streamer.
-     * @see #allowOverwrite()      
+     * @see #allowOverwrite()
      */
     public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
         IllegalStateException;
@@ -298,7 +300,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * the streamer.
      * <p>
      * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
-     * then data streamer will not overwrite existing cache entries for better performance 
+     * then data streamer will not overwrite existing cache entries for better performance
      * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
      *
      * @param entries Collection of entries to be streamed.
@@ -318,7 +320,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * the streamer.
      * <p>
      * Note: if {@link IgniteDataStreamer#allowOverwrite()} set to {@code false} (by default)
-     * then data streamer will not overwrite existing cache entries for better performance 
+     * then data streamer will not overwrite existing cache entries for better performance
      * (to change, set {@link IgniteDataStreamer#allowOverwrite(boolean)} to {@code true})
      *
      * @param entries Map to be streamed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 48a0ee2..8b6eaa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -247,7 +247,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      *
      * @return Data streamer processor.
      */
-    public <K, V> IgniteDataStreamerProcessor<K, V> dataStream();
+    public <K, V> DataStreamerProcessor<K, V> dataStream();
 
     /**
      * Gets file system processor.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 506bd48..c066c94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -200,7 +200,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringInclude
-    private IgniteDataStreamerProcessor dataLdrProc;
+    private DataStreamerProcessor dataLdrProc;
 
     /** */
     @GridToStringInclude
@@ -457,8 +457,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             affProc = (GridAffinityProcessor)comp;
         else if (comp instanceof GridRestProcessor)
             restProc = (GridRestProcessor)comp;
-        else if (comp instanceof IgniteDataStreamerProcessor)
-            dataLdrProc = (IgniteDataStreamerProcessor)comp;
+        else if (comp instanceof DataStreamerProcessor)
+            dataLdrProc = (DataStreamerProcessor)comp;
         else if (comp instanceof IgfsProcessorAdapter)
             igfsProc = (IgfsProcessorAdapter)comp;
         else if (comp instanceof GridOffHeapProcessor)
@@ -671,8 +671,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> IgniteDataStreamerProcessor<K, V> dataStream() {
-        return (IgniteDataStreamerProcessor<K, V>)dataLdrProc;
+    @Override public <K, V> DataStreamerProcessor<K, V> dataStream() {
+        return (DataStreamerProcessor<K, V>)dataLdrProc;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 06f1653..ba3b8b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -69,7 +69,7 @@ public enum GridTopic {
     TOPIC_IGFS,
 
     /** */
-    TOPIC_DATALOAD,
+    TOPIC_DATASTREAM,
 
     /** */
     TOPIC_STREAM,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 2806578..92e4ced 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -755,7 +755,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor(new GridTaskProcessor(ctx));
             startProcessor((GridProcessor)SCHEDULE.createOptional(ctx));
             startProcessor(new GridRestProcessor(ctx));
-            startProcessor(new IgniteDataStreamerProcessor(ctx));
+            startProcessor(new DataStreamerProcessor(ctx));
             startProcessor(new GridStreamProcessor(ctx));
             startProcessor((GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
             startProcessor(new GridContinuousProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7789fba..7be89cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -385,12 +385,12 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 62:
-                msg = new GridDataLoadRequest();
+                msg = new DataStreamerRequest();
 
                 break;
 
             case 63:
-                msg = new GridDataLoadResponse();
+                msg = new DataStreamerResponse();
 
                 break;
 
@@ -525,7 +525,7 @@ public class GridIoMessageFactory implements MessageFactory {
                 break;
 
             case 95:
-                msg = new IgniteDataLoaderEntry();
+                msg = new DataStreamerEntry();
 
                 break;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 581a42f..ff5e58c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3732,7 +3732,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
+            DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
 
             try {
                 ldr.updater(new IgniteDrDataStreamerCacheUpdater());
@@ -3883,18 +3883,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @throws IgniteCheckedException If failed.
      */
     private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
-        try (final IgniteDataStreamerImpl<KeyCacheObject, CacheObject> ldr =
+        try (final DataStreamerImpl<KeyCacheObject, CacheObject> ldr =
                  ctx.kernalContext().<KeyCacheObject, CacheObject>dataStream().dataStreamer(ctx.namex())) {
             ldr.allowOverwrite(true);
             ldr.skipStore(true);
 
-            final Collection<IgniteDataLoaderEntry> col = new ArrayList<>(ldr.perNodeBufferSize());
+            final Collection<DataStreamerEntry> col = new ArrayList<>(ldr.perNodeBufferSize());
 
             Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);
 
             ctx.store().loadAllFromStore(null, keys0, new CIX2<KeyCacheObject, Object>() {
                 @Override public void applyx(KeyCacheObject key, Object val) {
-                    col.add(new IgniteDataLoaderEntry(key, ctx.toCacheObject(val)));
+                    col.add(new DataStreamerEntry(key, ctx.toCacheObject(val)));
 
                     if (col.size() == ldr.perNodeBufferSize()) {
                         ldr.addDataInternal(col);
@@ -3926,7 +3926,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys);
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
+            DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.namex());
 
             try {
                 ldr.updater(new IgniteDrDataStreamerCacheUpdater());
@@ -5882,7 +5882,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final Collection<GridCacheRawVersionedEntry> col;
 
         /** */
-        final IgniteDataStreamerImpl<K, V> ldr;
+        final DataStreamerImpl<K, V> ldr;
 
         /** */
         final ExpiryPolicy plc;
@@ -5893,7 +5893,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
          * @param plc Optional expiry policy.
          */
         private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
-            IgniteDataStreamerImpl<K, V> ldr,
+            DataStreamerImpl<K, V> ldr,
             @Nullable ExpiryPolicy plc) {
             this.p = p;
             this.ldr = ldr;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 099bb24..3d85590 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -283,11 +283,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 else
                     dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
 
-                try (IgniteDataStreamerImpl<KeyCacheObject, Object> dataLdr =
-                         (IgniteDataStreamerImpl)ignite.dataStreamer(cacheName)) {
-                    ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
+                try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
+                         (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
+                    ((DataStreamerImpl)dataLdr).maxRemapCount(0);
 
-                    dataLdr.updater(IgniteDataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
+                    dataLdr.updater(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
                     for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
                         if (!locPart.isEmpty() && locPart.primary(topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
index b0291b3..9e73e1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
@@ -32,7 +32,7 @@ import java.nio.*;
 /**
  * Raw versioned entry.
  */
-public class GridCacheRawVersionedEntry<K, V> extends IgniteDataLoaderEntry implements
+public class GridCacheRawVersionedEntry<K, V> extends DataStreamerEntry implements
     GridCacheVersionedEntry<K, V>, GridCacheVersionable, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..64f0851
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerCacheUpdaters.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class DataStreamerCacheUpdaters {
+    /** */
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+    /**
+     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+     * updated concurrently. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+        Map<K, V> putMap) {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+     */
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new HashSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V>, InternalUpdater {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Marker interface for updaters which do not need to unwrap cache objects.
+     */
+    public static interface InternalUpdater {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerEntry.java
new file mode 100644
index 0000000..2aa11e5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerEntry.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class DataStreamerEntry implements Map.Entry<KeyCacheObject, CacheObject>, Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    protected KeyCacheObject key;
+
+    /** */
+    @GridToStringInclude
+    protected CacheObject val;
+
+    /**
+     *
+     */
+    public DataStreamerEntry() {
+        // No-op.
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     */
+    public DataStreamerEntry(KeyCacheObject key, CacheObject val) {
+        this.key = key;
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject getKey() {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject getValue() {
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheObject setValue(CacheObject val) {
+        CacheObject old = this.val;
+
+        this.val = val;
+
+        return old;
+    }
+
+    /**
+     * @param ctx Cache context.
+     * @return Map entry unwrapping internal key and value.
+     */
+    public <K, V> Map.Entry<K, V> toEntry(final GridCacheContext ctx) {
+        return new Map.Entry<K, V>() {
+            @Override public K getKey() {
+                return key.value(ctx.cacheObjectContext(), false);
+            }
+
+            @Override public V setValue(V val) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override public V getValue() {
+                return val != null ? val.<V>value(ctx.cacheObjectContext(), false) : null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                val = reader.readMessage("val");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 95;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerEntry.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerFuture.java
new file mode 100644
index 0000000..d16887d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Data streamer future.
+ */
+class DataStreamerFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Data loader. */
+    @GridToStringExclude
+    private DataStreamerImpl dataLdr;
+
+    /**
+     * @param dataLdr Data streamer.
+     */
+    DataStreamerFuture(DataStreamerImpl dataLdr) {
+        assert dataLdr != null;
+
+        this.dataLdr = dataLdr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() throws IgniteCheckedException {
+        if (onCancelled()) {
+            dataLdr.closeEx(true);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerFuture.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2c679e5/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerImpl.java
new file mode 100644
index 0000000..691c74f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/DataStreamerImpl.java
@@ -0,0 +1,1405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.cacheobject.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data streamer implementation.
+ */
+@SuppressWarnings("unchecked")
+public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+    /** Isolated updater. */
+    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+    /** Cache updater. */
+    private Updater<K, V> updater = ISOLATED_UPDATER;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Max remap count before issuing an error. */
+    private static final int DFLT_MAX_REMAP_CNT = 32;
+
+    /** Log reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Cache name ({@code null} for default cache). */
+    private final String cacheName;
+
+
+    /** Per-node buffer size. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+    /** */
+    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+    /** */
+    private long autoFlushFreq;
+
+    /** Mapping. */
+    @GridToStringInclude
+    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteCacheObjectProcessor cacheObjProc;
+
+    /** */
+    private final CacheObjectContext cacheObjCtx;
+
+    /** Communication topic for responses. */
+    private final Object topic;
+
+    /** */
+    private byte[] topicBytes;
+
+    /** {@code True} if data loader has been cancelled. */
+    private volatile boolean cancelled;
+
+    /** Active futures of this data loader. */
+    @GridToStringInclude
+    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+    /** Closure to remove from active futures. */
+    @GridToStringExclude
+    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> t) {
+            boolean rmv = activeFuts.remove(t);
+
+            assert rmv;
+        }
+    };
+
+    /** Job peer deploy aware. */
+    private volatile GridPeerDeployAware jobPda;
+
+    /** Deployment class. */
+    private Class<?> depCls;
+
+    /** Future to track loading finish. */
+    private final GridFutureAdapter<?> fut;
+
+    /** Public API future to track loading finish. */
+    private final IgniteFuture<?> publicFut;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /** */
+    private volatile long lastFlushTime = U.currentTimeMillis();
+
+    /** */
+    private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+    /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()} printed */
+    private static boolean isWarningPrinted;
+
+    /**
+     * @param ctx Grid kernal context.
+     * @param cacheName Cache name.
+     * @param flushQ Flush queue.
+     */
+    public DataStreamerImpl(
+        final GridKernalContext ctx,
+        @Nullable final String cacheName,
+        DelayQueue<DataStreamerImpl<K, V>> flushQ
+    ) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+        this.cacheObjProc = ctx.cacheObjects();
+
+        if (log == null)
+            log = U.logger(ctx, logRef, DataStreamerImpl.class);
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+        this.cacheObjCtx = ctx.cacheObjects().contextForCache(node, cacheName);
+        this.cacheName = cacheName;
+        this.flushQ = flushQ;
+
+        discoLsnr = new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                UUID id = discoEvt.eventNode().id();
+
+                // Remap regular mappings.
+                final Buffer buf = bufMappings.remove(id);
+
+                if (buf != null) {
+                    // Only async notification is possible since
+                    // discovery thread may be trapped otherwise.
+                    ctx.closure().callLocalSafe(
+                        new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                buf.onNodeLeft();
+
+                                return null;
+                            }
+                        },
+                        true /* system pool */
+                    );
+                }
+            }
+        };
+
+        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        // Generate unique topic for this loader.
+        topic = TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+        ctx.io().addMessageListener(topic, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof DataStreamerResponse;
+
+                DataStreamerResponse res = (DataStreamerResponse)msg;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received data load response: " + res);
+
+                Buffer buf = bufMappings.get(nodeId);
+
+                if (buf != null)
+                    buf.onResponse(res);
+
+                else if (log.isDebugEnabled())
+                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+            }
+        });
+
+        if (log.isDebugEnabled())
+            log.debug("Added response listener within topic: " + topic);
+
+        fut = new DataStreamerFuture(this);
+
+        publicFut = new IgniteFutureImpl<>(fut);
+    }
+
+    /**
+     * @return Cache object context.
+     */
+    public CacheObjectContext cacheObjectContext() {
+        return cacheObjCtx;
+    }
+
+    /**
+     * Enters busy lock.
+     */
+    private void enterBusy() {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Data streamer has been closed.");
+    }
+
+    /**
+     * Leaves busy lock.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> future() {
+        return publicFut;
+    }
+
+    /**
+     * @return Internal future.
+     */
+    public IgniteInternalFuture<?> internalFuture() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deployClass(Class<?> depCls) {
+        this.depCls = depCls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updater(Updater<K, V> updater) {
+        A.notNull(updater, "updater");
+
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowOverwrite() {
+        boolean allow = updater != ISOLATED_UPDATER;
+
+        if (!allow && !isWarningPrinted) {
+            synchronized (this) {
+                if (!isWarningPrinted) {
+                    log.warning("Data streamer will not overwrite existing cache entries for better performance " +
+                        "(to change, set allowOverwrite to true)");
+
+                    isWarningPrinted = true;
+                }
+            }
+        }
+
+        return allow;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void allowOverwrite(boolean allow) {
+        if (allow == allowOverwrite())
+            return;
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IgniteException("Failed to get node for cache: " + cacheName);
+
+        updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void skipStore(boolean skipStore) {
+        this.skipStore = skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeBufferSize() {
+        return bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeBufferSize(int bufSize) {
+        A.ensure(bufSize > 0, "bufSize > 0");
+
+        this.bufSize = bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeParallelOperations() {
+        return parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeParallelOperations(int parallelOps) {
+        this.parallelOps = parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long autoFlushFrequency() {
+        return autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void autoFlushFrequency(long autoFlushFreq) {
+        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+        long old = this.autoFlushFreq;
+
+        if (autoFlushFreq != old) {
+            this.autoFlushFreq = autoFlushFreq;
+
+            if (autoFlushFreq != 0 && old == 0)
+                flushQ.add(this);
+            else if (autoFlushFreq == 0)
+                flushQ.remove(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+        A.notNull(entries, "entries");
+
+        return addData(entries.entrySet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+        A.notEmpty(entries, "entries");
+
+        enterBusy();
+
+        try {
+            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+
+            resFut.listen(rmvActiveFut);
+
+            activeFuts.add(resFut);
+
+            Collection<KeyCacheObject> keys = null;
+
+            if (entries.size() > 1) {
+                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+                for (Map.Entry<K, V> entry : entries)
+                    keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true));
+            }
+
+            Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
+                @Override public DataStreamerEntry apply(Entry<K, V> e) {
+                    KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true);
+                    CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true);
+
+                    return new DataStreamerEntry(key, val);
+                }
+            });
+
+            load0(entries0, resFut, keys, 0);
+
+            return new IgniteFutureImpl<>(resFut);
+        }
+        catch (IgniteException e) {
+            return new IgniteFinishedFutureImpl<>(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @return Future.
+     */
+    public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val) {
+        return addDataInternal(Collections.singleton(new DataStreamerEntry(key, val)));
+    }
+
+    /**
+     * @param key Key.
+     * @return Future.
+     */
+    public IgniteFuture<?> removeDataInternal(KeyCacheObject key) {
+        return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null)));
+    }
+
+    /**
+     * @param entries Entries.
+     * @return Future.
+     */
+    public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries) {
+        enterBusy();
+
+        GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+
+        try {
+            resFut.listen(rmvActiveFut);
+
+            activeFuts.add(resFut);
+
+            Collection<KeyCacheObject> keys = null;
+
+            if (entries.size() > 1) {
+                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+                for (DataStreamerEntry entry : entries)
+                    keys.add(entry.getKey());
+            }
+
+            load0(entries, resFut, keys, 0);
+
+            return new IgniteFutureImpl<>(resFut);
+        }
+        catch (Throwable e) {
+            resFut.onDone(e);
+
+            if (e instanceof Error)
+                throw e;
+
+            return new IgniteFinishedFutureImpl<>(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+        A.notNull(entry, "entry");
+
+        return addData(F.asList(entry));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(K key, V val) {
+        A.notNull(key, "key");
+
+        KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true);
+        CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
+
+        return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> removeData(K key) {
+        return addData(key, null);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param resFut Result future.
+     * @param activeKeys Active keys.
+     * @param remaps Remaps count.
+     */
+    private void load0(
+        Collection<? extends DataStreamerEntry> entries,
+        final GridFutureAdapter<Object> resFut,
+        @Nullable final Collection<KeyCacheObject> activeKeys,
+        final int remaps
+    ) {
+        assert entries != null;
+
+        Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();
+
+        boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+        for (DataStreamerEntry entry : entries) {
+            List<ClusterNode> nodes;
+
+            try {
+                KeyCacheObject key = entry.getKey();
+
+                assert key != null;
+
+                if (initPda) {
+                    jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
+                        entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
+                        updater);
+
+                    initPda = false;
+                }
+
+                nodes = nodes(key);
+            }
+            catch (IgniteCheckedException e) {
+                resFut.onDone(e);
+
+                return;
+            }
+
+            if (F.isEmpty(nodes)) {
+                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                    "(no nodes with cache found in topology) [infos=" + entries.size() +
+                    ", cacheName=" + cacheName + ']'));
+
+                return;
+            }
+
+            for (ClusterNode node : nodes) {
+                Collection<DataStreamerEntry> col = mappings.get(node);
+
+                if (col == null)
+                    mappings.put(node, col = new ArrayList<>());
+
+                col.add(entry);
+            }
+        }
+
+        for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
+            final UUID nodeId = e.getKey().id();
+
+            Buffer buf = bufMappings.get(nodeId);
+
+            if (buf == null) {
+                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+                if (old != null)
+                    buf = old;
+            }
+
+            final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    try {
+                        t.get();
+
+                        if (activeKeys != null) {
+                            for (DataStreamerEntry e : entriesForNode)
+                                activeKeys.remove(e.getKey());
+
+                            if (activeKeys.isEmpty())
+                                resFut.onDone();
+                        }
+                        else {
+                            assert entriesForNode.size() == 1;
+
+                            // That has been a single key,
+                            // so complete result future right away.
+                            resFut.onDone();
+                        }
+                    }
+                    catch (IgniteCheckedException e1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                        if (cancelled) {
+                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                                DataStreamerImpl.this, e1));
+                        }
+                        else if (remaps + 1 > maxRemapCnt) {
+                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+                                + remaps), e1);
+                        }
+                        else
+                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                    }
+                }
+            };
+
+            GridFutureAdapter<?> f;
+
+            try {
+                f = buf.update(entriesForNode, lsnr);
+            }
+            catch (IgniteInterruptedCheckedException e1) {
+                resFut.onDone(e1);
+
+                return;
+            }
+
+            if (ctx.discovery().node(nodeId) == null) {
+                if (bufMappings.remove(nodeId, buf))
+                    buf.onNodeLeft();
+
+                if (f != null)
+                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                        "(node has left): " + nodeId));
+            }
+        }
+    }
+
+    /**
+     * @param key Key to map.
+     * @return Nodes to send requests to.
+     * @throws IgniteCheckedException If failed.
+     */
+    private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException {
+        GridAffinityProcessor aff = ctx.affinity();
+
+        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+    }
+
+    /**
+     * Performs flush.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doFlush() throws IgniteCheckedException {
+        lastFlushTime = U.currentTimeMillis();
+
+        List<IgniteInternalFuture> activeFuts0 = null;
+
+        int doneCnt = 0;
+
+        for (IgniteInternalFuture<?> f : activeFuts) {
+            if (!f.isDone()) {
+                if (activeFuts0 == null)
+                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+                activeFuts0.add(f);
+            }
+            else {
+                f.get();
+
+                doneCnt++;
+            }
+        }
+
+        if (activeFuts0 == null || activeFuts0.isEmpty())
+            return;
+
+        while (true) {
+            Queue<IgniteInternalFuture<?>> q = null;
+
+            for (Buffer buf : bufMappings.values()) {
+                IgniteInternalFuture<?> flushFut = buf.flush();
+
+                if (flushFut != null) {
+                    if (q == null)
+                        q = new ArrayDeque<>(bufMappings.size() * 2);
+
+                    q.add(flushFut);
+                }
+            }
+
+            if (q != null) {
+                assert !q.isEmpty();
+
+                boolean err = false;
+
+                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        err = true;
+                    }
+                }
+
+                if (err)
+                    // Remaps needed - flush buffers.
+                    continue;
+            }
+
+            doneCnt = 0;
+
+            for (int i = 0; i < activeFuts0.size(); i++) {
+                IgniteInternalFuture f = activeFuts0.get(i);
+
+                if (f == null)
+                    doneCnt++;
+                else if (f.isDone()) {
+                    f.get();
+
+                    doneCnt++;
+
+                    activeFuts0.set(i, null);
+                }
+                else
+                    break;
+            }
+
+            if (doneCnt == activeFuts0.size())
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void flush() throws IgniteException {
+        enterBusy();
+
+        try {
+            doFlush();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Flushes every internal buffer if buffer was flushed before passed in
+     * threshold.
+     * <p>
+     * Does not wait for result and does not fail on errors assuming that this method
+     * should be called periodically.
+     */
+    @Override public void tryFlush() throws IgniteInterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            for (Buffer buf : bufMappings.values())
+                buf.flush();
+
+            lastFlushTime = U.currentTimeMillis();
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteException If failed.
+     */
+    @Override public void close(boolean cancel) throws IgniteException {
+        try {
+            closeEx(cancel);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void closeEx(boolean cancel) throws IgniteCheckedException {
+        if (!closed.compareAndSet(false, true))
+            return;
+
+        busyLock.block();
+
+        if (log.isDebugEnabled())
+            log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
+
+        IgniteCheckedException e = null;
+
+        try {
+            // Assuming that no methods are called on this loader after this method is called.
+            if (cancel) {
+                cancelled = true;
+
+                for (Buffer buf : bufMappings.values())
+                    buf.cancelAll();
+            }
+            else
+                doFlush();
+
+            ctx.event().removeLocalEventListener(discoLsnr);
+
+            ctx.io().removeMessageListener(topic);
+        }
+        catch (IgniteCheckedException e0) {
+            e = e0;
+        }
+
+        fut.onDone(null, e);
+
+        if (e != null)
+            throw e;
+    }
+
+    /**
+     * @return {@code true} If the loader is closed.
+     */
+    boolean isClosed() {
+        return fut.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        close(false);
+    }
+
+    /**
+     * @return Max remap count.
+     */
+    public int maxRemapCount() {
+        return maxRemapCnt;
+    }
+
+    /**
+     * @param maxRemapCnt New max remap count.
+     */
+    public void maxRemapCount(int maxRemapCnt) {
+        this.maxRemapCnt = maxRemapCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerImpl.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDelay(TimeUnit unit) {
+        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @return Next flush time.
+     */
+    private long nextFlushTime() {
+        return lastFlushTime + autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(Delayed o) {
+        return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+    }
+
+    /**
+     *
+     */
+    private class Buffer {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Active futures. */
+        private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+        /** Buffered entries. */
+        private List<DataStreamerEntry> entries;
+
+        /** */
+        @GridToStringExclude
+        private GridFutureAdapter<Object> curFut;
+
+        /** Local node flag. */
+        private final boolean isLocNode;
+
+        /** ID generator. */
+        private final AtomicLong idGen = new AtomicLong();
+
+        /** Active futures. */
+        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+        /** */
+        private final Semaphore sem;
+
+        /** Closure to signal on task finish. */
+        @GridToStringExclude
+        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+            @Override public void apply(IgniteInternalFuture<Object> t) {
+                signalTaskFinished(t);
+            }
+        };
+
+        /**
+         * @param node Node.
+         */
+        Buffer(ClusterNode node) {
+            assert node != null;
+
+            this.node = node;
+
+            locFuts = new GridConcurrentHashSet<>();
+            reqs = new ConcurrentHashMap8<>();
+
+            // Cache local node flag.
+            isLocNode = node.equals(ctx.discovery().localNode());
+
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>();
+            curFut.listen(signalC);
+
+            sem = new Semaphore(parallelOps);
+        }
+
+        /**
+         * @param newEntries Infos.
+         * @param lsnr Listener for the operation future.
+         * @throws IgniteInterruptedCheckedException If failed.
+         * @return Future for operation.
+         */
+        @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+            List<DataStreamerEntry> entries0 = null;
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+
+                curFut0.listen(lsnr);
+
+                for (DataStreamerEntry entry : newEntries)
+                    entries.add(entry);
+
+                if (entries.size() >= bufSize) {
+                    entries0 = entries;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>();
+                    curFut.listen(signalC);
+                }
+            }
+
+            if (entries0 != null) {
+                submit(entries0, curFut0);
+
+                if (cancelled)
+                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
+            }
+
+            return curFut0;
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<DataStreamerEntry> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /**
+         * @return Future if any submitted.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+            List<DataStreamerEntry> entries0 = null;
+            GridFutureAdapter<Object> curFut0 = null;
+
+            synchronized (this) {
+                if (!entries.isEmpty()) {
+                    entries0 = entries;
+                    curFut0 = curFut;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>();
+                    curFut.listen(signalC);
+                }
+            }
+
+            if (entries0 != null)
+                submit(entries0, curFut0);
+
+            // Create compound future for this flush.
+            GridCompoundFuture<Object, Object> res = null;
+
+            for (IgniteInternalFuture<Object> f : locFuts) {
+                if (res == null)
+                    res = new GridCompoundFuture<>();
+
+                res.add(f);
+            }
+
+            for (IgniteInternalFuture<Object> f : reqs.values()) {
+                if (res == null)
+                    res = new GridCompoundFuture<>();
+
+                res.add(f);
+            }
+
+            if (res != null)
+                res.markInitialized();
+
+            return res;
+        }
+
+        /**
+         * Increments active tasks count.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+            U.acquire(sem);
+        }
+
+        /**
+         * @param f Future that finished.
+         */
+        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+            assert f != null;
+
+            sem.release();
+        }
+
+        /**
+         * @param entries Entries to submit.
+         * @param curFut Current future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut)
+            throws IgniteInterruptedCheckedException {
+            assert entries != null;
+            assert !entries.isEmpty();
+            assert curFut != null;
+
+            incrementActiveTasks();
+
+            IgniteInternalFuture<Object> fut;
+
+            if (isLocNode) {
+                fut = ctx.closure().callLocalSafe(
+                    new DataStreamerUpdateJob(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+                locFuts.add(fut);
+
+                fut.listen(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                    @Override public void apply(IgniteInternalFuture<Object> t) {
+                        try {
+                            boolean rmv = locFuts.remove(t);
+
+                            assert rmv;
+
+                            curFut.onDone(t.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            curFut.onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                try {
+                    for (DataStreamerEntry e : entries) {
+                        e.getKey().prepareMarshal(cacheObjCtx);
+
+                        CacheObject val = e.getValue();
+
+                        if (val != null)
+                            val.prepareMarshal(cacheObjCtx);
+                    }
+
+                    if (updaterBytes == null) {
+                        assert updater != null;
+
+                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
+                    }
+
+                    if (topicBytes == null)
+                        topicBytes = ctx.config().getMarshaller().marshal(topic);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to marshal (request will not be sent).", e);
+
+                    return;
+                }
+
+                GridDeployment dep = null;
+                GridPeerDeployAware jobPda0 = null;
+
+                if (ctx.deploy().enabled()) {
+                    try {
+                        jobPda0 = jobPda;
+
+                        assert jobPda0 != null;
+
+                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+                        if (cache != null)
+                            cache.context().deploy().onEnter();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+                        return;
+                    }
+
+                    if (dep == null)
+                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+                }
+
+                long reqId = idGen.incrementAndGet();
+
+                fut = curFut;
+
+                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+                DataStreamerRequest req = new DataStreamerRequest(
+                    reqId,
+                    topicBytes,
+                    cacheName,
+                    updaterBytes,
+                    entries,
+                    true,
+                    skipStore,
+                    dep != null ? dep.deployMode() : null,
+                    dep != null ? jobPda0.deployClass().getName() : null,
+                    dep != null ? dep.userVersion() : null,
+                    dep != null ? dep.participants() : null,
+                    dep != null ? dep.classLoaderId() : null,
+                    dep == null);
+
+                try {
+                    ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                        ((GridFutureAdapter<Object>)fut).onDone(e);
+                    else
+                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+                            "request (node has left): " + node.id()));
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void onNodeLeft() {
+            assert !isLocNode;
+            assert bufMappings.get(node.id()) != this;
+
+            if (log.isDebugEnabled())
+                log.debug("Forcibly completing futures (node has left): " + node.id());
+
+            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                "(node has left): " + node.id());
+
+            for (GridFutureAdapter<Object> f : reqs.values())
+                f.onDone(e);
+
+            // Make sure to complete current future.
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+            }
+
+            curFut0.onDone(e);
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(DataStreamerResponse res) {
+            if (log.isDebugEnabled())
+                log.debug("Received data load response: " + res);
+
+            GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+            if (f == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Future for request has not been found: " + res.requestId());
+
+                return;
+            }
+
+            Throwable err = null;
+
+            byte[] errBytes = res.errorBytes();
+
+            if (errBytes != null) {
+                try {
+                    GridPeerDeployAware jobPda0 = jobPda;
+
+                    err = ctx.config().getMarshaller().unmarshal(
+                        errBytes,
+                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+                    return;
+                }
+            }
+
+            f.onDone(null, err);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+        }
+
+        /**
+         *
+         */
+        void cancelAll() {
+            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
+
+            for (IgniteInternalFuture<?> f : locFuts) {
+                try {
+                    f.cancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to cancel mini-future.", e);
+                }
+            }
+
+            for (GridFutureAdapter<?> f : reqs.values())
+                f.onDone(err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            int size;
+
+            synchronized (this) {
+                size = entries.size();
+            }
+
+            return S.toString(Buffer.class, this,
+                "entriesCnt", size,
+                "locFutsSize", locFuts.size(),
+                "reqsSize", reqs.size());
+        }
+    }
+
+    /**
+     * Data streamer peer-deploy aware.
+     */
+    private class DataStreamerPda implements GridPeerDeployAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Deploy class. */
+        private Class<?> cls;
+
+        /** Class loader. */
+        private ClassLoader ldr;
+
+        /** Collection of objects to detect deploy class and class loader. */
+        private Collection<Object> objs;
+
+        /**
+         * Constructs data streamer peer-deploy aware.
+         *
+         * @param objs Collection of objects to detect deploy class and class loader.
+         */
+        private DataStreamerPda(Object... objs) {
+            this.objs = Arrays.asList(objs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> deployClass() {
+            if (cls == null) {
+                Class<?> cls0 = null;
+
+                if (depCls != null)
+                    cls0 = depCls;
+                else {
+                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+                        Object o = it.next();
+
+                        if (o != null)
+                            cls0 = U.detectClass(o);
+                    }
+
+                    if (cls0 == null || U.isJdk(cls0))
+                        cls0 = DataStreamerImpl.class;
+                }
+
+                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+                cls = cls0;
+            }
+
+            return cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClassLoader classLoader() {
+            if (ldr == null) {
+                ClassLoader ldr0 = deployClass().getClassLoader();
+
+                // Safety.
+                if (ldr0 == null)
+                    ldr0 = U.gridClassLoader();
+
+                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+                ldr = ldr0;
+            }
+
+            return ldr;
+        }
+    }
+
+    /**
+     * Isolated updater which only loads entry initial value.
+     */
+    private static class IsolatedUpdater implements Updater<KeyCacheObject, CacheObject>,
+        DataStreamerCacheUpdaters.InternalUpdater {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<KeyCacheObject, CacheObject> cache,
+            Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+            IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
+
+            GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache();
+
+            if (internalCache.isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheContext cctx = internalCache.context();
+
+            long topVer = cctx.affinity().affinityTopologyVersion();
+
+            GridCacheVersion ver = cctx.versions().next(topVer);
+
+            for (Map.Entry<KeyCacheObject, CacheObject> e : entries) {
+                try {
+                    e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+
+                    GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
+
+                    entry.unswap(true, false);
+
+                    entry.initialValue(e.getValue(),
+                        ver,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        false,
+                        topVer,
+                        GridDrType.DR_LOAD);
+
+                    cctx.evicts().touch(entry, topVer);
+                }
+                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+                    // No-op.
+                }
+                catch (IgniteCheckedException ex) {
+                    IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+                }
+            }
+        }
+    }
+}


Mime
View raw message