ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [62/65] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 21:27:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
new file mode 100644
index 0000000..6f784f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.internal.util.direct.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class GridDataLoadResponse extends GridTcpCommunicationMessageAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * @param reqId Request ID.
+     * @param errBytes Error bytes.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+        this.reqId = reqId;
+        this.errBytes = errBytes;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadResponse() {
+        // No-op.
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+    @Override public GridTcpCommunicationMessageAdapter clone() {
+        GridDataLoadResponse _clone = new GridDataLoadResponse();
+
+        clone0(_clone);
+
+        return _clone;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+        GridDataLoadResponse _clone = (GridDataLoadResponse)_msg;
+
+        _clone.reqId = reqId;
+        _clone.errBytes = errBytes;
+        _clone.forceLocDep = forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean writeTo(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        if (!commState.typeWritten) {
+            if (!commState.putByte(directType()))
+                return false;
+
+            commState.typeWritten = true;
+        }
+
+        switch (commState.idx) {
+            case 0:
+                if (!commState.putByteArray(errBytes))
+                    return false;
+
+                commState.idx++;
+
+            case 1:
+                if (!commState.putBoolean(forceLocDep))
+                    return false;
+
+                commState.idx++;
+
+            case 2:
+                if (!commState.putLong(reqId))
+                    return false;
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("all")
+    @Override public boolean readFrom(ByteBuffer buf) {
+        commState.setBuffer(buf);
+
+        switch (commState.idx) {
+            case 0:
+                byte[] errBytes0 = commState.getByteArray();
+
+                if (errBytes0 == BYTE_ARR_NOT_READ)
+                    return false;
+
+                errBytes = errBytes0;
+
+                commState.idx++;
+
+            case 1:
+                if (buf.remaining() < 1)
+                    return false;
+
+                forceLocDep = commState.getBoolean();
+
+                commState.idx++;
+
+            case 2:
+                if (buf.remaining() < 8)
+                    return false;
+
+                reqId = commState.getLong();
+
+                commState.idx++;
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 62;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
new file mode 100644
index 0000000..5b4d4c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.dataload.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<Map.Entry<K, V>> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataLoadCacheUpdater<K, V> updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param updater Updater.
+     */
+    GridDataLoadUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<Map.Entry<K, V>> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataLoadCacheUpdater<K, V> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
new file mode 100644
index 0000000..2912db7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+
+import java.io.*;
+
+/**
+ * Data loader future.
+ */
+class GridDataLoaderFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Data loader. */
+    @GridToStringExclude
+    private IgniteDataLoader dataLdr;
+
+    /**
+     * Default constructor for {@link Externalizable} support.
+     */
+    public GridDataLoaderFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param dataLdr Data loader.
+     */
+    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoader dataLdr) {
+        super(ctx);
+
+        assert dataLdr != null;
+
+        this.dataLdr = dataLdr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() throws IgniteCheckedException {
+        checkValid();
+
+        if (onCancelled()) {
+            dataLdr.close(true);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoaderFuture.class, this, super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
new file mode 100644
index 0000000..194d30d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.dataload.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
+    /** Loaders map (access is not supposed to be highly concurrent). */
+    private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Flushing thread. */
+    private Thread flusher;
+
+    /** */
+    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
+
+    /** Marshaller. */
+    private final IgniteMarshaller marsh;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public GridDataLoaderProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridDataLoadRequest;
+
+                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
+            }
+        });
+
+        marsh = ctx.config().getMarshaller();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
+            @Override protected void body() throws InterruptedException, IgniteInterruptedException {
+                while (!isCancelled()) {
+                    IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
+
+                    if (!busyLock.enterBusy())
+                        return;
+
+                    try {
+                        if (ldr.isClosed())
+                            continue;
+
+                        ldr.tryFlush();
+
+                        flushQ.offer(ldr);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            }
+        });
+
+        flusher.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Started data loader processor.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (ctx.config().isDaemon())
+            return;
+
+        ctx.io().removeMessageListener(TOPIC_DATALOAD);
+
+        busyLock.block();
+
+        U.interrupt(flusher);
+        U.join(flusher, log);
+
+        for (IgniteDataLoader<?, ?> ldr : ldrs) {
+            if (log.isDebugEnabled())
+                log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
+
+            try {
+                ldr.close(cancel);
+            }
+            catch (IgniteInterruptedException e) {
+                U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close data loader: " + ldr, e);
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped data loader processor.");
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @param compact {@code true} if data loader should transfer data in compact format.
+     * @return Data loader.
+     */
+    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to create data loader (grid is stopping).");
+
+        try {
+            final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
+
+            ldrs.add(ldr);
+
+            ldr.future().listenAsync(new CI1<IgniteFuture<?>>() {
+                @Override public void apply(IgniteFuture<?> f) {
+                    boolean b = ldrs.remove(ldr);
+
+                    assert b : "Loader has not been added to set: " + ldr;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Loader has been completed: " + ldr);
+                }
+            });
+
+            return ldr;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @return Data loader.
+     */
+    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+        return dataLoader(cacheName, true);
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Request.
+     */
+    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring data load request (node is stopping): " + req);
+
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Processing data load request: " + req);
+
+            Object topic;
+
+            try {
+                topic = marsh.unmarshal(req.responseTopicBytes(), null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal topic from request: " + req, e);
+
+                return;
+            }
+
+            ClassLoader clsLdr;
+
+            if (req.forceLocalDeployment())
+                clsLdr = U.gridClassLoader();
+            else {
+                GridDeployment dep = ctx.deploy().getGlobalDeployment(
+                    req.deploymentMode(),
+                    req.sampleClassName(),
+                    req.sampleClassName(),
+                    req.userVersion(),
+                    nodeId,
+                    req.classLoaderId(),
+                    req.participants(),
+                    null);
+
+                if (dep == null) {
+                    sendResponse(nodeId,
+                        topic,
+                        req.requestId(),
+                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
+                            ", req=" + req + ']'),
+                        false);
+
+                    return;
+                }
+
+                clsLdr = dep.classLoader();
+            }
+
+            Collection<Map.Entry<K, V>> col;
+            IgniteDataLoadCacheUpdater<K, V> updater;
+
+            try {
+                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
+                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
+
+                sendResponse(nodeId, topic, req.requestId(), e, false);
+
+                return;
+            }
+
+            GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx,
+                log,
+                req.cacheName(),
+                col,
+                req.ignoreDeploymentOwnership(),
+                req.skipStore(),
+                updater);
+
+            Exception err = null;
+
+            try {
+                job.call();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to finish update job.", e);
+
+                err = e;
+            }
+
+            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param resTopic Response topic.
+     * @param reqId Request ID.
+     * @param err Error.
+     * @param forceLocDep Force local deployment.
+     */
+    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
+        boolean forceLocDep) {
+        byte[] errBytes;
+
+        try {
+            errBytes = err != null ? marsh.marshal(err) : null;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to marshal message.", e);
+
+            return;
+        }
+
+        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
+
+        try {
+            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (ctx.discovery().alive(nodeId))
+                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+            else if (log.isDebugEnabled())
+                log.debug("Node has left the grid: " + nodeId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>>   ldrsSize: " + ldrs.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
new file mode 100644
index 0000000..7ae9a87
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.dataload.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.product.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+import static org.apache.ignite.internal.GridNodeAttributes.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data loader implementation.
+ */
+public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed {
+    /** */
+    public static final IgniteProductVersion COMPACT_MAP_ENTRIES_SINCE = IgniteProductVersion.fromString("1.0.0");
+
+    /** Cache updater. */
+    private IgniteDataLoadCacheUpdater<K, V> updater = GridDataLoadCacheUpdaters.individual();
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Max remap count before issuing an error. */
+    private static final int MAX_REMAP_CNT = 32;
+
+    /** Log reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Cache name ({@code null} for default cache). */
+    private final String cacheName;
+
+    /** Portable enabled flag. */
+    private final boolean portableEnabled;
+
+    /**
+     *  If {@code true} then data will be transferred in compact format (only keys and values).
+     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
+     */
+    private final boolean compact;
+
+    /** Per-node buffer size. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+    /** */
+    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+    /** */
+    private long autoFlushFreq;
+
+    /** Mapping. */
+    @GridToStringInclude
+    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** Communication topic for responses. */
+    private final Object topic;
+
+    /** */
+    private byte[] topicBytes;
+
+    /** {@code True} if data loader has been cancelled. */
+    private volatile boolean cancelled;
+
+    /** Active futures of this data loader. */
+    @GridToStringInclude
+    private final Collection<IgniteFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+    /** Closure to remove from active futures. */
+    @GridToStringExclude
+    private final IgniteInClosure<IgniteFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteFuture<?>>() {
+        @Override public void apply(IgniteFuture<?> t) {
+            boolean rmv = activeFuts.remove(t);
+
+            assert rmv;
+        }
+    };
+
+    /** Job peer deploy aware. */
+    private volatile GridPeerDeployAware jobPda;
+
+    /** Deployment class. */
+    private Class<?> depCls;
+
+    /** Future to track loading finish. */
+    private final GridFutureAdapter<?> fut;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /** */
+    private volatile long lastFlushTime = U.currentTimeMillis();
+
+    /** */
+    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ;
+
+    /** */
+    private boolean skipStore;
+
+    /**
+     * @param ctx Grid kernal context.
+     * @param cacheName Cache name.
+     * @param flushQ Flush queue.
+     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
+     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
+     */
+    public IgniteDataLoaderImpl(
+        final GridKernalContext ctx,
+        @Nullable final String cacheName,
+        DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ,
+        boolean compact
+    ) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+        this.cacheName = cacheName;
+        this.flushQ = flushQ;
+        this.compact = compact;
+
+        log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
+
+        ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes());
+
+        if (node == null)
+            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+        Map<String, Boolean> attrPortable = node.attribute(ATTR_CACHE_PORTABLE);
+
+        Boolean portableEnabled0 = attrPortable == null ? null : attrPortable.get(CU.mask(cacheName));
+
+        portableEnabled = portableEnabled0 == null ? false : portableEnabled0;
+
+        discoLsnr = new GridLocalEventListener() {
+            @Override public void onEvent(IgniteEvent evt) {
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+                UUID id = discoEvt.eventNode().id();
+
+                // Remap regular mappings.
+                final Buffer buf = bufMappings.remove(id);
+
+                if (buf != null) {
+                    // Only async notification is possible since
+                    // discovery thread may be trapped otherwise.
+                    ctx.closure().callLocalSafe(
+                        new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                buf.onNodeLeft();
+
+                                return null;
+                            }
+                        },
+                        true /* system pool */
+                    );
+                }
+            }
+        };
+
+        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        // Generate unique topic for this loader.
+        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+        ctx.io().addMessageListener(topic, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridDataLoadResponse;
+
+                GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received data load response: " + res);
+
+                Buffer buf = bufMappings.get(nodeId);
+
+                if (buf != null)
+                    buf.onResponse(res);
+
+                else if (log.isDebugEnabled())
+                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+            }
+        });
+
+        if (log.isDebugEnabled())
+            log.debug("Added response listener within topic: " + topic);
+
+        fut = new GridDataLoaderFuture(ctx, this);
+    }
+
+    /**
+     * Enters busy lock.
+     */
+    private void enterBusy() {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Data loader has been closed.");
+    }
+
+    /**
+     * Leaves busy lock.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> future() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deployClass(Class<?> depCls) {
+        this.depCls = depCls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updater(IgniteDataLoadCacheUpdater<K, V> updater) {
+        A.notNull(updater, "updater");
+
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isolated() {
+        return updater != GridDataLoadCacheUpdaters.individual();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void isolated(boolean isolated) throws IgniteCheckedException {
+        if (isolated())
+            return;
+
+        ClusterNode node = F.first(ctx.grid().forCache(cacheName).nodes());
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to get node for cache: " + cacheName);
+
+        GridCacheAttributes a = U.cacheAttributes(node, cacheName);
+
+        assert a != null;
+
+        updater = a.atomicityMode() == GridCacheAtomicityMode.ATOMIC ?
+            GridDataLoadCacheUpdaters.<K, V>batched() :
+            GridDataLoadCacheUpdaters.<K, V>groupLocked();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void skipStore(boolean skipStore) {
+        this.skipStore = skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeBufferSize() {
+        return bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeBufferSize(int bufSize) {
+        A.ensure(bufSize > 0, "bufSize > 0");
+
+        this.bufSize = bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeParallelLoadOperations() {
+        return parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeParallelLoadOperations(int parallelOps) {
+        this.parallelOps = parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long autoFlushFrequency() {
+        return autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void autoFlushFrequency(long autoFlushFreq) {
+        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+        long old = this.autoFlushFreq;
+
+        if (autoFlushFreq != old) {
+            this.autoFlushFreq = autoFlushFreq;
+
+            if (autoFlushFreq != 0 && old == 0)
+                flushQ.add(this);
+            else if (autoFlushFreq == 0)
+                flushQ.remove(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+        A.notNull(entries, "entries");
+
+        return addData(entries.entrySet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+        A.notEmpty(entries, "entries");
+
+        enterBusy();
+
+        try {
+            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+            activeFuts.add(resFut);
+
+            resFut.listenAsync(rmvActiveFut);
+
+            Collection<K> keys = new GridConcurrentHashSet<>(entries.size(), 1.0f, 16);
+
+            for (Map.Entry<K, V> entry : entries)
+                keys.add(entry.getKey());
+
+            load0(entries, resFut, keys, 0);
+
+            return resFut;
+        }
+        catch (IgniteException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteCheckedException, IllegalStateException {
+        A.notNull(entry, "entry");
+
+        return addData(F.asList(entry));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(K key, V val) throws IgniteCheckedException, IllegalStateException {
+        A.notNull(key, "key");
+
+        return addData(new Entry0<>(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> removeData(K key) throws IgniteCheckedException, IllegalStateException {
+        return addData(key, null);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param resFut Result future.
+     * @param activeKeys Active keys.
+     * @param remaps Remaps count.
+     */
+    private void load0(
+        Collection<? extends Map.Entry<K, V>> entries,
+        final GridFutureAdapter<Object> resFut,
+        final Collection<K> activeKeys,
+        final int remaps
+    ) {
+        assert entries != null;
+
+        if (remaps >= MAX_REMAP_CNT) {
+            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): " + remaps));
+
+            return;
+        }
+
+        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+
+        boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+        for (Map.Entry<K, V> entry : entries) {
+            ClusterNode node;
+
+            try {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                if (initPda) {
+                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+
+                    initPda = false;
+                }
+
+                node = ctx.affinity().mapKeyToNode(cacheName, key);
+            }
+            catch (IgniteCheckedException e) {
+                resFut.onDone(e);
+
+                return;
+            }
+
+            if (node == null) {
+                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                    "(no nodes with cache found in topology) [infos=" + entries.size() +
+                    ", cacheName=" + cacheName + ']'));
+
+                return;
+            }
+
+            Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+            if (col == null)
+                mappings.put(node, col = new ArrayList<>());
+
+            col.add(entry);
+        }
+
+        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
+            final UUID nodeId = e.getKey().id();
+
+            Buffer buf = bufMappings.get(nodeId);
+
+            if (buf == null) {
+                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+                if (old != null)
+                    buf = old;
+            }
+
+            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+            IgniteInClosure<IgniteFuture<?>> lsnr = new IgniteInClosure<IgniteFuture<?>>() {
+                @Override public void apply(IgniteFuture<?> t) {
+                    try {
+                        t.get();
+
+                        for (Map.Entry<K, V> e : entriesForNode)
+                            activeKeys.remove(e.getKey());
+
+                        if (activeKeys.isEmpty())
+                            resFut.onDone();
+                    }
+                    catch (IgniteCheckedException e1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                        if (cancelled) {
+                            resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
+                                IgniteDataLoaderImpl.this, e1));
+                        }
+                        else
+                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                    }
+                }
+            };
+
+            GridFutureAdapter<?> f;
+
+            try {
+                f = buf.update(entriesForNode, lsnr);
+            }
+            catch (IgniteInterruptedException e1) {
+                resFut.onDone(e1);
+
+                return;
+            }
+
+            if (ctx.discovery().node(nodeId) == null) {
+                if (bufMappings.remove(nodeId, buf))
+                    buf.onNodeLeft();
+
+                if (f != null)
+                    f.onDone(new ClusterTopologyException("Failed to wait for request completion " +
+                        "(node has left): " + nodeId));
+            }
+        }
+    }
+
+    /**
+     * Performs flush.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doFlush() throws IgniteCheckedException {
+        lastFlushTime = U.currentTimeMillis();
+
+        List<IgniteFuture> activeFuts0 = null;
+
+        int doneCnt = 0;
+
+        for (IgniteFuture<?> f : activeFuts) {
+            if (!f.isDone()) {
+                if (activeFuts0 == null)
+                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+                activeFuts0.add(f);
+            }
+            else {
+                f.get();
+
+                doneCnt++;
+            }
+        }
+
+        if (activeFuts0 == null || activeFuts0.isEmpty())
+            return;
+
+        while (true) {
+            Queue<IgniteFuture<?>> q = null;
+
+            for (Buffer buf : bufMappings.values()) {
+                IgniteFuture<?> flushFut = buf.flush();
+
+                if (flushFut != null) {
+                    if (q == null)
+                        q = new ArrayDeque<>(bufMappings.size() * 2);
+
+                    q.add(flushFut);
+                }
+            }
+
+            if (q != null) {
+                assert !q.isEmpty();
+
+                boolean err = false;
+
+                for (IgniteFuture fut = q.poll(); fut != null; fut = q.poll()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        err = true;
+                    }
+                }
+
+                if (err)
+                    // Remaps needed - flush buffers.
+                    continue;
+            }
+
+            doneCnt = 0;
+
+            for (int i = 0; i < activeFuts0.size(); i++) {
+                IgniteFuture f = activeFuts0.get(i);
+
+                if (f == null)
+                    doneCnt++;
+                else if (f.isDone()) {
+                    f.get();
+
+                    doneCnt++;
+
+                    activeFuts0.set(i, null);
+                }
+                else
+                    break;
+            }
+
+            if (doneCnt == activeFuts0.size())
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void flush() throws IgniteCheckedException {
+        enterBusy();
+
+        try {
+            doFlush();
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Flushes every internal buffer if buffer was flushed before passed in
+     * threshold.
+     * <p>
+     * Does not wait for result and does not fail on errors assuming that this method
+     * should be called periodically.
+     */
+    @Override public void tryFlush() throws IgniteInterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            for (Buffer buf : bufMappings.values())
+                buf.flush();
+
+            lastFlushTime = U.currentTimeMillis();
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Override public void close(boolean cancel) throws IgniteCheckedException {
+        if (!closed.compareAndSet(false, true))
+            return;
+
+        busyLock.block();
+
+        if (log.isDebugEnabled())
+            log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']');
+
+        IgniteCheckedException e = null;
+
+        try {
+            // Assuming that no methods are called on this loader after this method is called.
+            if (cancel) {
+                cancelled = true;
+
+                for (Buffer buf : bufMappings.values())
+                    buf.cancelAll();
+            }
+            else
+                doFlush();
+
+            ctx.event().removeLocalEventListener(discoLsnr);
+
+            ctx.io().removeMessageListener(topic);
+        }
+        catch (IgniteCheckedException e0) {
+            e = e0;
+        }
+
+        fut.onDone(null, e);
+
+        if (e != null)
+            throw e;
+    }
+
+    /**
+     * @return {@code true} If the loader is closed.
+     */
+    boolean isClosed() {
+        return fut.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        close(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDataLoaderImpl.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDelay(TimeUnit unit) {
+        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @return Next flush time.
+     */
+    private long nextFlushTime() {
+        return lastFlushTime + autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(Delayed o) {
+        return nextFlushTime() > ((IgniteDataLoaderImpl)o).nextFlushTime() ? 1 : -1;
+    }
+
+    /**
+     *
+     */
+    private class Buffer {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Active futures. */
+        private final Collection<IgniteFuture<Object>> locFuts;
+
+        /** Buffered entries. */
+        private List<Map.Entry<K, V>> entries;
+
+        /** */
+        @GridToStringExclude
+        private GridFutureAdapter<Object> curFut;
+
+        /** Local node flag. */
+        private final boolean isLocNode;
+
+        /** ID generator. */
+        private final AtomicLong idGen = new AtomicLong();
+
+        /** Active futures. */
+        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+        /** */
+        private final Semaphore sem;
+
+        /** Closure to signal on task finish. */
+        @GridToStringExclude
+        private final IgniteInClosure<IgniteFuture<Object>> signalC = new IgniteInClosure<IgniteFuture<Object>>() {
+            @Override public void apply(IgniteFuture<Object> t) {
+                signalTaskFinished(t);
+            }
+        };
+
+        /**
+         * @param node Node.
+         */
+        Buffer(ClusterNode node) {
+            assert node != null;
+
+            this.node = node;
+
+            locFuts = new GridConcurrentHashSet<>();
+            reqs = new ConcurrentHashMap8<>();
+
+            // Cache local node flag.
+            isLocNode = node.equals(ctx.discovery().localNode());
+
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>(ctx);
+            curFut.listenAsync(signalC);
+
+            sem = new Semaphore(parallelOps);
+        }
+
+        /**
+         * @param newEntries Infos.
+         * @param lsnr Listener for the operation future.
+         * @throws org.apache.ignite.IgniteInterruptedException If failed.
+         * @return Future for operation.
+         */
+        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
+            IgniteInClosure<IgniteFuture<?>> lsnr) throws IgniteInterruptedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+
+                curFut0.listenAsync(lsnr);
+
+                for (Map.Entry<K, V> entry : newEntries)
+                    entries.add(entry);
+
+                if (entries.size() >= bufSize) {
+                    entries0 = entries;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null) {
+                submit(entries0, curFut0);
+
+                if (cancelled)
+                    curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this));
+            }
+
+            return curFut0;
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<Map.Entry<K, V>> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /**
+         * @return Future if any submitted.
+         *
+         * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted.
+         */
+        @Nullable
+        IgniteFuture<?> flush() throws IgniteInterruptedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0 = null;
+
+            synchronized (this) {
+                if (!entries.isEmpty()) {
+                    entries0 = entries;
+                    curFut0 = curFut;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null)
+                submit(entries0, curFut0);
+
+            // Create compound future for this flush.
+            GridCompoundFuture<Object, Object> res = null;
+
+            for (IgniteFuture<Object> f : locFuts) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            for (IgniteFuture<Object> f : reqs.values()) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            if (res != null)
+                res.markInitialized();
+
+            return res;
+        }
+
+        /**
+         * Increments active tasks count.
+         *
+         * @throws org.apache.ignite.IgniteInterruptedException If thread has been interrupted.
+         */
+        private void incrementActiveTasks() throws IgniteInterruptedException {
+            U.acquire(sem);
+        }
+
+        /**
+         * @param f Future that finished.
+         */
+        private void signalTaskFinished(IgniteFuture<Object> f) {
+            assert f != null;
+
+            sem.release();
+        }
+
+        /**
+         * @param entries Entries to submit.
+         * @param curFut Current future.
+         * @throws org.apache.ignite.IgniteInterruptedException If interrupted.
+         */
+        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
+            throws IgniteInterruptedException {
+            assert entries != null;
+            assert !entries.isEmpty();
+            assert curFut != null;
+
+            incrementActiveTasks();
+
+            IgniteFuture<Object> fut;
+
+            if (isLocNode) {
+                fut = ctx.closure().callLocalSafe(
+                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+                locFuts.add(fut);
+
+                fut.listenAsync(new IgniteInClosure<IgniteFuture<Object>>() {
+                    @Override public void apply(IgniteFuture<Object> t) {
+                        try {
+                            boolean rmv = locFuts.remove(t);
+
+                            assert rmv;
+
+                            curFut.onDone(t.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            curFut.onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                byte[] entriesBytes;
+
+                try {
+                    if (compact) {
+                        if (node.version().compareTo(COMPACT_MAP_ENTRIES_SINCE) < 0) {
+                            Collection<Map.Entry<K, V>> entries0 = new ArrayList<>(entries.size());
+
+                            GridPortableProcessor portable = ctx.portable();
+
+                            for (Map.Entry<K, V> entry : entries)
+                                entries0.add(new Entry0<>(
+                                    portableEnabled ? (K)portable.marshalToPortable(entry.getKey()) : entry.getKey(),
+                                    portableEnabled ? (V)portable.marshalToPortable(entry.getValue()) : entry.getValue()));
+
+                            entriesBytes = ctx.config().getMarshaller().marshal(entries0);
+                        }
+                        else
+                            entriesBytes = ctx.config().getMarshaller()
+                                .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
+                    }
+                    else
+                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
+
+                    if (updaterBytes == null) {
+                        assert updater != null;
+
+                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
+                    }
+
+                    if (topicBytes == null)
+                        topicBytes = ctx.config().getMarshaller().marshal(topic);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to marshal (request will not be sent).", e);
+
+                    return;
+                }
+
+                GridDeployment dep = null;
+                GridPeerDeployAware jobPda0 = null;
+
+                if (ctx.deploy().enabled()) {
+                    try {
+                        jobPda0 = jobPda;
+
+                        assert jobPda0 != null;
+
+                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+                        if (cache != null)
+                            cache.context().deploy().onEnter();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+                        return;
+                    }
+
+                    if (dep == null)
+                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+                }
+
+                long reqId = idGen.incrementAndGet();
+
+                fut = curFut;
+
+                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+                GridDataLoadRequest req = new GridDataLoadRequest(
+                    reqId,
+                    topicBytes,
+                    cacheName,
+                    updaterBytes,
+                    entriesBytes,
+                    true,
+                    skipStore,
+                    dep != null ? dep.deployMode() : null,
+                    dep != null ? jobPda0.deployClass().getName() : null,
+                    dep != null ? dep.userVersion() : null,
+                    dep != null ? dep.participants() : null,
+                    dep != null ? dep.classLoaderId() : null,
+                    dep == null);
+
+                try {
+                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                        ((GridFutureAdapter<Object>)fut).onDone(e);
+                    else
+                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyException("Failed to send " +
+                            "request (node has left): " + node.id()));
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void onNodeLeft() {
+            assert !isLocNode;
+            assert bufMappings.get(node.id()) != this;
+
+            if (log.isDebugEnabled())
+                log.debug("Forcibly completing futures (node has left): " + node.id());
+
+            Exception e = new ClusterTopologyException("Failed to wait for request completion " +
+                "(node has left): " + node.id());
+
+            for (GridFutureAdapter<Object> f : reqs.values())
+                f.onDone(e);
+
+            // Make sure to complete current future.
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+            }
+
+            curFut0.onDone(e);
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(GridDataLoadResponse res) {
+            if (log.isDebugEnabled())
+                log.debug("Received data load response: " + res);
+
+            GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+            if (f == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Future for request has not been found: " + res.requestId());
+
+                return;
+            }
+
+            Throwable err = null;
+
+            byte[] errBytes = res.errorBytes();
+
+            if (errBytes != null) {
+                try {
+                    GridPeerDeployAware jobPda0 = jobPda;
+
+                    err = ctx.config().getMarshaller().unmarshal(
+                        errBytes,
+                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+                    return;
+                }
+            }
+
+            f.onDone(null, err);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+        }
+
+        /**
+         *
+         */
+        void cancelAll() {
+            IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this);
+
+            for (IgniteFuture<?> f : locFuts) {
+                try {
+                    f.cancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to cancel mini-future.", e);
+                }
+            }
+
+            for (GridFutureAdapter<?> f : reqs.values())
+                f.onDone(err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            int size;
+
+            synchronized (this) {
+                size = entries.size();
+            }
+
+            return S.toString(Buffer.class, this,
+                "entriesCnt", size,
+                "locFutsSize", locFuts.size(),
+                "reqsSize", reqs.size());
+        }
+    }
+
+    /**
+     * Data loader peer-deploy aware.
+     */
+    private class DataLoaderPda implements GridPeerDeployAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Deploy class. */
+        private Class<?> cls;
+
+        /** Class loader. */
+        private ClassLoader ldr;
+
+        /** Collection of objects to detect deploy class and class loader. */
+        private Collection<Object> objs;
+
+        /**
+         * Constructs data loader peer-deploy aware.
+         *
+         * @param objs Collection of objects to detect deploy class and class loader.
+         */
+        private DataLoaderPda(Object... objs) {
+            this.objs = Arrays.asList(objs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> deployClass() {
+            if (cls == null) {
+                Class<?> cls0 = null;
+
+                if (depCls != null)
+                    cls0 = depCls;
+                else {
+                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+                        Object o = it.next();
+
+                        if (o != null)
+                            cls0 = U.detectClass(o);
+                    }
+
+                    if (cls0 == null || U.isJdk(cls0))
+                        cls0 = IgniteDataLoaderImpl.class;
+                }
+
+                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+                cls = cls0;
+            }
+
+            return cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClassLoader classLoader() {
+            if (ldr == null) {
+                ClassLoader ldr0 = deployClass().getClassLoader();
+
+                // Safety.
+                if (ldr0 == null)
+                    ldr0 = U.gridClassLoader();
+
+                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+                ldr = ldr0;
+            }
+
+            return ldr;
+        }
+    }
+
+    /**
+     * Entry.
+     */
+    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private K key;
+
+        /** */
+        private V val;
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         */
+        private Entry0(K key, @Nullable V val) {
+            assert key != null;
+
+            this.key = key;
+            this.val = val;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        @SuppressWarnings("UnusedDeclaration")
+        public Entry0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public K getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V setValue(V val) {
+            V old = this.val;
+
+            this.val = val;
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(key);
+            out.writeObject(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            key = (K)in.readObject();
+            val = (V)in.readObject();
+        }
+    }
+
+    /**
+     * Wrapper list with special compact serialization of map entries.
+     */
+    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**  Wrapped delegate. */
+        private Collection<Map.Entry<K, V>> delegate;
+
+        /** Optional portable processor for converting values. */
+        private GridPortableProcessor portable;
+
+        /**
+         * @param delegate Delegate.
+         * @param portable Portable processor.
+         */
+        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
+            this.delegate = delegate;
+            this.portable = portable;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public Entries0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<Entry<K, V>> iterator() {
+            return delegate.iterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return delegate.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(delegate.size());
+
+            boolean portableEnabled = portable != null;
+
+            for (Map.Entry<K, V> entry : delegate) {
+                if (portableEnabled) {
+                    out.writeObject(portable.marshalToPortable(entry.getKey()));
+                    out.writeObject(portable.marshalToPortable(entry.getValue()));
+                }
+                else {
+                    out.writeObject(entry.getKey());
+                    out.writeObject(entry.getValue());
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            int sz = in.readInt();
+
+            delegate = new ArrayList<>(sz);
+
+            for (int i = 0; i < sz; i++) {
+                Object k = in.readObject();
+                Object v = in.readObject();
+
+                delegate.add(new Entry0<>((K)k, (V)v));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
new file mode 100644
index 0000000..50a90ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+    <!-- Package description. -->
+    Data loader processor.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java
index 46ea537..a570ce0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsDataManager.java
@@ -31,7 +31,7 @@ import org.apache.ignite.transactions.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.dataload.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
index d75a45c..323c1a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsIpcHandler.java
@@ -22,7 +22,7 @@ import org.apache.ignite.fs.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.internal.fs.common.*;
-import org.gridgain.grid.kernal.processors.closure.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.license.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
index 293f956..a960472 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerBroadcastTask.java
@@ -23,7 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.streamer.*;
-import org.gridgain.grid.kernal.processors.closure.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
index 16004ee..600e8e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerQueryTask.java
@@ -23,7 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.streamer.*;
-import org.gridgain.grid.kernal.processors.closure.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
index 32e346c..b2d106d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/task/GridStreamerReduceTask.java
@@ -23,7 +23,7 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.streamer.*;
-import org.gridgain.grid.kernal.processors.closure.*;
+import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
index 892464b..81f74f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
@@ -30,8 +30,8 @@ import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.near.*;
 import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.clock.*;
-import org.gridgain.grid.kernal.processors.continuous.*;
-import org.gridgain.grid.kernal.processors.dataload.*;
+import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.rest.handlers.task.*;
 import org.apache.ignite.spi.collision.jobstealing.*;
 import org.apache.ignite.spi.communication.tcp.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignment.java
deleted file mode 100644
index f3e2c8e..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityAssignment.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.affinity;
-
-import org.apache.ignite.cluster.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Cached affinity calculations.
- */
-class GridAffinityAssignment implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Topology version. */
-    private final long topVer;
-
-    /** Collection of calculated affinity nodes. */
-    private List<List<ClusterNode>> assignment;
-
-    /** Map of primary node partitions. */
-    private final Map<UUID, Set<Integer>> primary;
-
-    /** Map of backup node partitions. */
-    private final Map<UUID, Set<Integer>> backup;
-
-    /**
-     * Constructs cached affinity calculations item.
-     *
-     * @param topVer Topology version.
-     */
-    GridAffinityAssignment(long topVer) {
-        this.topVer = topVer;
-        primary = new HashMap<>();
-        backup = new HashMap<>();
-    }
-
-    /**
-     * @param topVer Topology version.
-     * @param assignment Assignment.
-     */
-    GridAffinityAssignment(long topVer, List<List<ClusterNode>> assignment) {
-        this.topVer = topVer;
-        this.assignment = assignment;
-
-        primary = new HashMap<>();
-        backup = new HashMap<>();
-
-        initPrimaryBackupMaps();
-    }
-
-    /**
-     * @return Affinity assignment.
-     */
-    public List<List<ClusterNode>> assignment() {
-        return assignment;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * Get affinity nodes for partition.
-     *
-     * @param part Partition.
-     * @return Affinity nodes.
-     */
-    public List<ClusterNode> get(int part) {
-        assert part >= 0 && part < assignment.size() : "Affinity partition is out of range" +
-            " [part=" + part + ", partitions=" + assignment.size() + ']';
-
-        return assignment.get(part);
-    }
-
-    /**
-     * Get primary partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get primary partitions for.
-     * @return Primary partitions for specified node ID.
-     */
-    public Set<Integer> primaryPartitions(UUID nodeId) {
-        Set<Integer> set = primary.get(nodeId);
-
-        return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set);
-    }
-
-    /**
-     * Get backup partitions for specified node ID.
-     *
-     * @param nodeId Node ID to get backup partitions for.
-     * @return Backup partitions for specified node ID.
-     */
-    public Set<Integer> backupPartitions(UUID nodeId) {
-        Set<Integer> set = backup.get(nodeId);
-
-        return set == null ? Collections.<Integer>emptySet() : Collections.unmodifiableSet(set);
-    }
-
-    /**
-     * Initializes primary and backup maps.
-     */
-    private void initPrimaryBackupMaps() {
-        // Temporary mirrors with modifiable partition's collections.
-        Map<UUID, Set<Integer>> tmpPrm = new HashMap<>();
-        Map<UUID, Set<Integer>> tmpBkp = new HashMap<>();
-
-        for (int partsCnt = assignment.size(), p = 0; p < partsCnt; p++) {
-            // Use the first node as primary, other - backups.
-            Map<UUID, Set<Integer>> tmp = tmpPrm;
-            Map<UUID, Set<Integer>> map = primary;
-
-            for (ClusterNode node : assignment.get(p)) {
-                UUID id = node.id();
-
-                Set<Integer> set = tmp.get(id);
-
-                if (set == null) {
-                    tmp.put(id, set = new HashSet<>());
-                    map.put(id, Collections.unmodifiableSet(set));
-                }
-
-                set.add(p);
-
-                // Use the first node as primary, other - backups.
-                tmp = tmpBkp;
-                map = backup;
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return (int)(topVer ^ (topVer >>> 32));
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("SimplifiableIfStatement")
-    @Override public boolean equals(Object o) {
-        if (o == this)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        return topVer == ((GridAffinityAssignment)o).topVer;
-    }
-}


Mime
View raw message