ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/50] [abbrv] incubator-ignite git commit: renaming wip
Date Thu, 12 Mar 2015 17:21:04 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java
new file mode 100644
index 0000000..c7abd3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerProcessor.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class DataStreamerProcessor<K, V> extends GridProcessorAdapter {
+    /** Loaders map (access is not supposed to be highly concurrent). */
+    private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Flushing thread. */
+    private Thread flusher;
+
+    /** */
+    private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
+
+    /** Marshaller. */
+    private final Marshaller marsh;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public DataStreamerProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof DataStreamerRequest;
+
+                processRequest(nodeId, (DataStreamerRequest)msg);
+            }
+        });
+
+        marsh = ctx.config().getMarshaller();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
+            @Override protected void body() throws InterruptedException {
+                while (!isCancelled()) {
+                    DataStreamerImpl<K, V> ldr = flushQ.take();
+
+                    if (!busyLock.enterBusy())
+                        return;
+
+                    try {
+                        if (ldr.isClosed())
+                            continue;
+
+                        ldr.tryFlush();
+
+                        flushQ.offer(ldr);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            }
+        });
+
+        flusher.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Started data streamer processor.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (ctx.config().isDaemon())
+            return;
+
+        ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+
+        busyLock.block();
+
+        U.interrupt(flusher);
+        U.join(flusher, log);
+
+        for (DataStreamerImpl<?, ?> ldr : ldrs) {
+            if (log.isDebugEnabled())
+                log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
+
+            try {
+                ldr.closeEx(cancel);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close data streamer: " + ldr, e);
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped data streamer processor.");
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @return Data loader.
+     */
+    public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
+
+        try {
+            final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx, cacheName, flushQ);
+
+            ldrs.add(ldr);
+
+            ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
+                    boolean b = ldrs.remove(ldr);
+
+                    assert b : "Loader has not been added to set: " + ldr;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Loader has been completed: " + ldr);
+                }
+            });
+
+            return ldr;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Request.
+     */
+    private void processRequest(UUID nodeId, DataStreamerRequest req) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring data load request (node is stopping): " + req);
+
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Processing data load request: " + req);
+
+            Object topic;
+
+            try {
+                topic = marsh.unmarshal(req.responseTopicBytes(), null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal topic from request: " + req, e);
+
+                return;
+            }
+
+            ClassLoader clsLdr;
+
+            if (req.forceLocalDeployment())
+                clsLdr = U.gridClassLoader();
+            else {
+                GridDeployment dep = ctx.deploy().getGlobalDeployment(
+                    req.deploymentMode(),
+                    req.sampleClassName(),
+                    req.sampleClassName(),
+                    req.userVersion(),
+                    nodeId,
+                    req.classLoaderId(),
+                    req.participants(),
+                    null);
+
+                if (dep == null) {
+                    sendResponse(nodeId,
+                        topic,
+                        req.requestId(),
+                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
+                            ", req=" + req + ']'),
+                        false);
+
+                    return;
+                }
+
+                clsLdr = dep.classLoader();
+            }
+
+            IgniteDataStreamer.Updater<K, V> updater;
+
+            try {
+                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
+
+                sendResponse(nodeId, topic, req.requestId(), e, false);
+
+                return;
+            }
+
+            Collection<DataStreamerEntry> col = req.entries();
+
+            DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
+                log,
+                req.cacheName(),
+                col,
+                req.ignoreDeploymentOwnership(),
+                req.skipStore(),
+                updater);
+
+            Exception err = null;
+
+            try {
+                job.call();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to finish update job.", e);
+
+                err = e;
+            }
+
+            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param resTopic Response topic.
+     * @param reqId Request ID.
+     * @param err Error.
+     * @param forceLocDep Force local deployment.
+     */
+    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
+        boolean forceLocDep) {
+        byte[] errBytes;
+
+        try {
+            errBytes = err != null ? marsh.marshal(err) : null;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to marshal message.", e);
+
+            return;
+        }
+
+        DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes, forceLocDep);
+
+        try {
+            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (ctx.discovery().alive(nodeId))
+                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+            else if (log.isDebugEnabled())
+                log.debug("Node has left the grid: " + nodeId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>>   ldrsSize: " + ldrs.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
new file mode 100644
index 0000000..26f4965
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class DataStreamerRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] resTopicBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Entries to update. */
+    @GridDirectCollection(DataStreamerEntry.class)
+    private Collection<DataStreamerEntry> entries;
+
+    /** {@code True} to ignore deployment ownership. */
+    private boolean ignoreDepOwnership;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private DeploymentMode depMode;
+
+    /** */
+    private String sampleClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    private Map<UUID, IgniteUuid> ldrParticipants;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public DataStreamerRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param resTopicBytes Response topic.
+     * @param cacheName Cache name.
+     * @param updaterBytes Cache updater.
+     * @param entries Entries to put.
+     * @param ignoreDepOwnership Ignore ownership.
+     * @param skipStore Skip store flag.
+     * @param depMode Deployment mode.
+     * @param sampleClsName Sample class name.
+     * @param userVer User version.
+     * @param ldrParticipants Loader participants.
+     * @param clsLdrId Class loader ID.
+     * @param forceLocDep Force local deployment.
+     */
+    public DataStreamerRequest(long reqId,
+        byte[] resTopicBytes,
+        @Nullable String cacheName,
+        byte[] updaterBytes,
+        Collection<DataStreamerEntry> entries,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        DeploymentMode depMode,
+        String sampleClsName,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParticipants,
+        IgniteUuid clsLdrId,
+        boolean forceLocDep) {
+        this.reqId = reqId;
+        this.resTopicBytes = resTopicBytes;
+        this.cacheName = cacheName;
+        this.updaterBytes = updaterBytes;
+        this.entries = entries;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.depMode = depMode;
+        this.sampleClsName = sampleClsName;
+        this.userVer = userVer;
+        this.ldrParticipants = ldrParticipants;
+        this.clsLdrId = clsLdrId;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Response topic.
+     */
+    public byte[] responseTopicBytes() {
+        return resTopicBytes;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Updater.
+     */
+    public byte[] updaterBytes() {
+        return updaterBytes;
+    }
+
+    /**
+     * @return Entries to update.
+     */
+    public Collection<DataStreamerEntry> entries() {
+        return entries;
+    }
+
+    /**
+     * @return {@code True} to ignore ownership.
+     */
+    public boolean ignoreDeploymentOwnership() {
+        return ignoreDepOwnership;
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public DeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Sample class name.
+     */
+    public String sampleClassName() {
+        return sampleClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Participants.
+     */
+    public Map<UUID, IgniteUuid> participants() {
+        return ldrParticipants;
+    }
+
+    /**
+     * @return Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("cacheName", cacheName))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeString("sampleClsName", sampleClsName))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeString("userVer", userVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheName = reader.readString("cacheName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                byte depModeOrd;
+
+                depModeOrd = reader.readByte("depMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+                reader.incrementState();
+
+            case 3:
+                entries = reader.readCollection("entries", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                resTopicBytes = reader.readByteArray("resTopicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                sampleClsName = reader.readString("sampleClsName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                userVer = reader.readString("userVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 62;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 13;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
new file mode 100644
index 0000000..8aee0d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class DataStreamerResponse implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * @param reqId Request ID.
+     * @param errBytes Error bytes.
+     * @param forceLocDep Force local deployment.
+     */
+    public DataStreamerResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+        this.reqId = reqId;
+        this.errBytes = errBytes;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public DataStreamerResponse() {
+        // No-op.
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DataStreamerResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 63;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
new file mode 100644
index 0000000..1faf22d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class DataStreamerUpdateJob implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<DataStreamerEntry> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataStreamer.Updater updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param skipStore Skip store flag.
+     * @param updater Updater.
+     */
+    DataStreamerUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<DataStreamerEntry> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataStreamer.Updater<?, ?> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<?, ?>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            final GridCacheContext cctx = cache.context();
+
+            for (DataStreamerEntry e : col) {
+                e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+
+                CacheObject val = e.getValue();
+
+                if (val != null)
+                    val.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+            }
+
+            if (unwrapEntries()) {
+                Collection<Map.Entry> col0 = F.viewReadOnly(col, new C1<DataStreamerEntry, Map.Entry>() {
+                    @Override public Map.Entry apply(DataStreamerEntry e) {
+                        return e.toEntry(cctx);
+                    }
+                });
+
+                updater.update(cache, col0);
+            }
+            else
+                updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+
+    /**
+     * @return {@code True} if need to unwrap internal entries.
+     */
+    private boolean unwrapEntries() {
+        return !(updater instanceof DataStreamerCacheUpdaters.InternalUpdater);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html
new file mode 100644
index 0000000..1090b86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+    <!-- Package description. -->
+    Data streamer processor.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index 9ceeb0d..af8b088 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.datastream.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 378ef6b..6f7cb92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.datastream.*;
+import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java
deleted file mode 100644
index 9f6356b..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerImplSelfTest.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests for {@code IgniteDataStreamerImpl}.
- */
-public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of keys to load via data streamer. */
-    private static final int KEYS_COUNT = 1000;
-
-    /** Started grid counter. */
-    private static int cnt;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        // Forth node goes without cache.
-        if (cnt < 4)
-            cfg.setCacheConfiguration(cacheConfiguration());
-
-        cnt++;
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
-        try {
-            startGrids(5);
-
-            final CyclicBarrier barrier = new CyclicBarrier(2);
-
-            multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    U.awaitQuiet(barrier);
-
-                    G.stopAll(true);
-
-                    return null;
-                }
-            }, 1);
-
-            Ignite g4 = grid(4);
-
-            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
-
-            dataLdr.perNodeBufferSize(32);
-
-            for (int i = 0; i < 100000; i += 2) {
-                dataLdr.addData(i, i);
-                dataLdr.removeData(i + 1);
-            }
-
-            U.awaitQuiet(barrier);
-
-            info("Closing data streamer.");
-
-            try {
-                dataLdr.close(true);
-            }
-            catch (IllegalStateException ignore) {
-                // This is ok to ignore this exception as test is racy by it's nature -
-                // grid is stopping in different thread.
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Data streamer should correctly load entries from HashMap in case of grids with more than one node
-     *  and with GridOptimizedMarshaller that requires serializable.
-     *
-     * @throws Exception If failed.
-     */
-    public void testAddDataFromMap() throws Exception {
-        try {
-            cnt = 0;
-
-            startGrids(2);
-
-            Ignite g0 = grid(0);
-
-            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
-
-            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
-
-            for (int i = 0; i < KEYS_COUNT; i ++)
-                map.put(i, String.valueOf(i));
-
-            dataLdr.addData(map);
-
-            dataLdr.close();
-
-            Random rnd = new Random();
-
-            IgniteCache<Integer, String> c = g0.jcache(null);
-
-            for (int i = 0; i < KEYS_COUNT; i ++) {
-                Integer k = rnd.nextInt(KEYS_COUNT);
-
-                String v = c.get(k);
-
-                assertEquals(k.toString(), v);
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Gets cache configuration.
-     *
-     * @return Cache configuration.
-     */
-    private CacheConfiguration cacheConfiguration() {
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(1);
-        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        return cacheCfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestObject implements Serializable {
-        /** */
-        private int val;
-
-        /**
-         */
-        private TestObject() {
-            // No-op.
-        }
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        public Integer val() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof TestObject && ((TestObject)obj).val == val;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java
deleted file mode 100644
index 170875c..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/DataStreamerProcessorSelfTest.java
+++ /dev/null
@@ -1,970 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class DataStreamerProcessorSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static ConcurrentHashMap<Object, Object> storeMap;
-
-    /** */
-    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private CacheMode mode = PARTITIONED;
-
-    /** */
-    private boolean nearEnabled = true;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private TestStore store;
-
-    /** {@inheritDoc} */
-    @Override public void afterTest() throws Exception {
-        super.afterTest();
-
-        useCache = false;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(mode);
-            cc.setAtomicityMode(TRANSACTIONAL);
-            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-
-            cc.setEvictSynchronized(false);
-            cc.setEvictNearSynchronized(false);
-
-            if (store != null) {
-                cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
-                cc.setReadThrough(true);
-                cc.setWriteThrough(true);
-            }
-
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitioned() throws Exception {
-        mode = PARTITIONED;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testColocated() throws Exception {
-        mode = PARTITIONED;
-        nearEnabled = false;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicated() throws Exception {
-        mode = REPLICATED;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocal() throws Exception {
-        mode = LOCAL;
-
-        try {
-            checkDataStreamer();
-
-            assert false;
-        }
-        catch (IgniteCheckedException e) {
-            // Cannot load local cache configured remotely.
-            info("Caught expected exception: " + e);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ErrorNotRethrown")
-    private void checkDataStreamer() throws Exception {
-        try {
-            Ignite g1 = startGrid(1);
-
-            useCache = true;
-
-            Ignite g2 = startGrid(2);
-            startGrid(3);
-
-            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
-            ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final AtomicInteger idxGen = new AtomicInteger();
-            final int cnt = 400;
-            final int threads = 10;
-
-            final CountDownLatch l1 = new CountDownLatch(threads);
-
-            IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        int idx = idxGen.getAndIncrement();
-
-                        futs.add(ldr.addData(idx, idx));
-                    }
-
-                    l1.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l1.await();
-
-            // This will wait until data streamer finishes loading.
-            stopGrid(getTestGridName(1), false);
-
-            f1.get();
-
-            int s2 = internalCache(2).primaryKeySet().size();
-            int s3 = internalCache(3).primaryKeySet().size();
-            int total = threads * cnt;
-
-            assertEquals(total, s2 + s3);
-
-            final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
-
-            rmvLdr.updater(DataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final CountDownLatch l2 = new CountDownLatch(threads);
-
-            IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        final int key = idxGen.decrementAndGet();
-
-                        futs.add(rmvLdr.removeData(key));
-                    }
-
-                    l2.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l2.await();
-
-            rmvLdr.close(false);
-
-            f2.get();
-
-            s2 = internalCache(2).primaryKeySet().size();
-            s3 = internalCache(3).primaryKeySet().size();
-
-            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedIsolated() throws Exception {
-        mode = PARTITIONED;
-
-        checkIsolatedDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedIsolated() throws Exception {
-        mode = REPLICATED;
-
-        checkIsolatedDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkIsolatedDataStreamer() throws Exception {
-        try {
-            useCache = true;
-
-            Ignite g1 = startGrid(0);
-            startGrid(1);
-            startGrid(2);
-
-            awaitPartitionMapExchange();
-
-            IgniteCache<Integer, Integer> cache = grid(0).jcache(null);
-
-            for (int i = 0; i < 100; i++)
-                cache.put(i, -1);
-
-            final int cnt = 40_000;
-            final int threads = 10;
-
-            try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
-                final AtomicInteger idxGen = new AtomicInteger();
-
-                IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        for (int i = 0; i < cnt; i++) {
-                            int idx = idxGen.getAndIncrement();
-
-                            ldr.addData(idx, idx);
-                        }
-
-                        return null;
-                    }
-                }, threads);
-
-                f1.get();
-            }
-
-            for (int g = 0; g < 3; g++) {
-                ClusterNode locNode = grid(g).localNode();
-
-                GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
-
-                if (cache0.isNear())
-                    cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
-
-                CacheAffinity<Integer> aff = cache0.affinity();
-
-                for (int key = 0; key < cnt * threads; key++) {
-                    if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
-                        GridCacheEntryEx entry = cache0.peekEx(key);
-
-                        assertNotNull("Missing entry for key: " + key, entry);
-                        assertEquals((key < 100 ? -1 : key),
-                            CU.value(entry.rawGetOrUnmarshal(false), cache0.context(), false));
-                    }
-                }
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Test primitive arrays can be passed into data streamer.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPrimitiveArrays() throws Exception {
-        try {
-            useCache = true;
-            mode = PARTITIONED;
-
-            Ignite g1 = startGrid(1);
-            startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
-
-            List<Object> arrays = Arrays.<Object>asList(
-                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
-                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
-
-            IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
-
-            for (int i = 0, size = arrays.size(); i < 1000; i++) {
-                Object arr = arrays.get(i % size);
-
-                dataLdr.addData(i, arr);
-                dataLdr.addData(i, fixedClosure(arr));
-            }
-
-            dataLdr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedMultiThreaded() throws Exception {
-        mode = REPLICATED;
-
-        checkLoaderMultithreaded(1, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedMultiThreaded() throws Exception {
-        mode = PARTITIONED;
-
-        checkLoaderMultithreaded(1, 3);
-    }
-
-    /**
-     * Tests loader in multithreaded environment with various count of grids started.
-     *
-     * @param nodesCntNoCache How many nodes should be started without cache.
-     * @param nodesCntCache How many nodes should be started with cache.
-     * @throws Exception If failed.
-     */
-    protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
-        throws Exception {
-        try {
-            // Start all required nodes.
-            int idx = 1;
-
-            for (int i = 0; i < nodesCntNoCache; i++)
-                startGrid(idx++);
-
-            useCache = true;
-
-            for (int i = 0; i < nodesCntCache; i++)
-                startGrid(idx++);
-
-            Ignite g1 = grid(1);
-
-            // Get and configure loader.
-            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
-            ldr.updater(DataStreamerCacheUpdaters.<Integer, Integer>individual());
-            ldr.perNodeBufferSize(2);
-
-            // Define count of puts.
-            final AtomicInteger idxGen = new AtomicInteger();
-
-            final AtomicBoolean done = new AtomicBoolean();
-
-            try {
-                final int totalPutCnt = 50000;
-
-                IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
-
-                        while (!done.get()) {
-                            int idx = idxGen.getAndIncrement();
-
-                            if (idx >= totalPutCnt) {
-                                info(">>> Stopping producer thread since maximum count of puts reached.");
-
-                                break;
-                            }
-
-                            futs.add(ldr.addData(idx, idx));
-                        }
-
-                        ldr.flush();
-
-                        for (IgniteFuture<?> fut : futs)
-                            fut.get();
-
-                        return null;
-                    }
-                }, 5, "producer");
-
-                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!done.get()) {
-                            ldr.flush();
-
-                            U.sleep(100);
-                        }
-
-                        return null;
-                    }
-                }, 1, "flusher");
-
-                // Define index of node being restarted.
-                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
-
-                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            for (int i = 0; i < 5; i++) {
-                                Ignite g = startGrid(restartNodeIdx);
-
-                                UUID id = g.cluster().localNode().id();
-
-                                info(">>>>>>> Started node: " + id);
-
-                                U.sleep(1000);
-
-                                stopGrid(getTestGridName(restartNodeIdx), true);
-
-                                info(">>>>>>> Stopped node: " + id);
-                            }
-                        }
-                        finally {
-                            done.set(true);
-
-                            info("Start stop thread finished.");
-                        }
-
-                        return null;
-                    }
-                }, 1, "start-stop-thread");
-
-                fut1.get();
-                fut2.get();
-                fut3.get();
-            }
-            finally {
-                ldr.close(false);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLoaderApi() throws Exception {
-        useCache = true;
-
-        try {
-            Ignite g1 = startGrid(1);
-
-            IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
-
-            ldr.close(false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            try {
-                // Create another loader.
-                ldr = g1.dataStreamer("UNKNOWN_CACHE");
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            ldr.close(true);
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            // Create another loader.
-            ldr = g1.dataStreamer(null);
-
-            // Cancel with future.
-            ldr.future().cancel();
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            try {
-                ldr.future().get();
-
-                assert false;
-            }
-            catch (IgniteFutureCancelledException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            // Create another loader.
-            ldr = g1.dataStreamer(null);
-
-            // This will close loader.
-            stopGrid(getTestGridName(1), false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Callable.
-     */
-    private static Callable<Integer> callable(@Nullable final Integer i) {
-        return new Callable<Integer>() {
-            @Override public Integer call() throws Exception {
-                return i;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Closure.
-     */
-    private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
-        return new IgniteClosure<Integer, Integer>() {
-            @Override public Integer apply(Integer e) {
-                return e == null ? i : e + i;
-            }
-        };
-    }
-
-    /**
-     * Wraps object to closure returning it.
-     *
-     * @param obj Value to wrap.
-     * @return Closure.
-     */
-    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T e) {
-                assert e == null || obj == null || e.getClass() == obj.getClass() :
-                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
-
-                return obj;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure expecting it and returning {@code null}.
-     *
-     * @param exp Expected closure value.
-     * @return Remove expected cache value closure.
-     */
-    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T act) {
-                if (exp == null ? act == null : exp.equals(act))
-                    return null;
-
-                throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
-            }
-        };
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            multithreaded(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    ldr.flush();
-
-                    assertEquals(9, c.size());
-
-                    return null;
-                }
-            }, 5, "flush-checker");
-
-            ldr.addData(100, 100);
-
-            ldr.flush();
-
-            assertEquals(10, c.size());
-
-            ldr.addData(200, 200);
-
-            ldr.close(false);
-
-            ldr.future().get();
-
-            assertEquals(11, c.size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTryFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            ldr.tryFlush();
-
-            Thread.sleep(100);
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlushTimeout() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final CountDownLatch latch = new CountDownLatch(9);
-
-            g.events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    latch.countDown();
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_PUT);
-
-            IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            assertTrue(c.localSize() == 0);
-
-            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-            ldr.autoFlushFrequency(3000);
-            ldr.allowOverwrite(true);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            assertFalse(latch.await(1000, MILLISECONDS));
-
-            assertTrue(c.localSize() == 0);
-
-            assertTrue(latch.await(3000, MILLISECONDS));
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdateStore() throws Exception {
-        storeMap = new ConcurrentHashMap<>();
-
-        try {
-            store = new TestStore();
-
-            useCache = true;
-
-            Ignite ignite = startGrid(1);
-
-            startGrid(2);
-            startGrid(3);
-
-            for (int i = 0; i < 1000; i++)
-                storeMap.put(i, i);
-
-            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
-                ldr.allowOverwrite(true);
-
-                assertFalse(ldr.skipStore());
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.removeData(i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.addData(i, i);
-            }
-
-            for (int i = 0; i < 1000; i++)
-                assertNull(storeMap.get(i));
-
-            for (int i = 1000; i < 2000; i++)
-                assertEquals(i, storeMap.get(i));
-
-            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
-                ldr.allowOverwrite(true);
-
-                ldr.skipStore(true);
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.addData(i, i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.removeData(i);
-            }
-
-            IgniteCache<Object, Object> cache = ignite.jcache(null);
-
-            for (int i = 0; i < 1000; i++) {
-                assertNull(storeMap.get(i));
-
-                assertEquals(i, cache.get(i));
-            }
-
-            for (int i = 1000; i < 2000; i++) {
-                assertEquals(i, storeMap.get(i));
-
-                assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
-            }
-        }
-        finally {
-            storeMap = null;
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCustomUserUpdater() throws Exception {
-        useCache = true;
-
-        try {
-            Ignite ignite = startGrid(1);
-
-            startGrid(2);
-            startGrid(3);
-
-            try (IgniteDataStreamer<String, TestObject> ldr = ignite.dataStreamer(null)) {
-                ldr.allowOverwrite(true);
-
-                ldr.updater(new IgniteDataStreamer.Updater<String, TestObject>() {
-                    @Override public void update(IgniteCache<String, TestObject> cache,
-                        Collection<Map.Entry<String, TestObject>> entries) {
-                        for (Map.Entry<String, TestObject> e : entries) {
-                            assertTrue(e.getKey() instanceof String);
-                            assertTrue(e.getValue() instanceof TestObject);
-
-                            cache.put(e.getKey(), new TestObject(e.getValue().val + 1));
-                        }
-                    }
-                });
-
-                for (int i = 0; i < 100; i++)
-                    ldr.addData(String.valueOf(i), new TestObject(i));
-            }
-
-            IgniteCache<String, TestObject> cache = ignite.jcache(null);
-
-            for (int i = 0; i < 100; i++) {
-                TestObject val = cache.get(String.valueOf(i));
-
-                assertNotNull(val);
-                assertEquals(i + 1, val.val);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestObject {
-        /** Value. */
-        private final int val;
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            TestObject obj = (TestObject)o;
-
-            return val == obj.val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestStore extends CacheStoreAdapter<Object, Object> {
-        /** {@inheritDoc} */
-        @Nullable @Override public Object load(Object key) {
-            return storeMap.get(key);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) {
-            storeMap.put(entry.getKey(), entry.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) {
-            storeMap.remove(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
deleted file mode 100644
index 053228c..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastream;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Data streamer performance test. Compares group lock data streamer to traditional lock.
- * <p>
- * Disable assertions and give at least 2 GB heap to run this test.
- */
-public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final int ENTRY_CNT = 80000;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private String[] vals = new String[2048];
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
-        cfg.setConnectorConfiguration(null);
-
-        cfg.setPeerClassLoadingEnabled(true);
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(PARTITIONED);
-
-            cc.setDistributionMode(PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-            cc.setStartSize(ENTRY_CNT / GRID_CNT);
-            cc.setSwapEnabled(false);
-
-            cc.setBackups(1);
-
-            cfg.setCacheSanityCheckEnabled(false);
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        for (int i = 0; i < vals.length; i++) {
-            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
-
-            StringBuilder sb = new StringBuilder();
-
-            for (int j = 0; j < valLen; j++)
-                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
-
-            vals[i] = sb.toString();
-
-            info("Value: " + vals[i]);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPerformance() throws Exception {
-        doTest();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTest() throws Exception {
-        System.gc();
-        System.gc();
-        System.gc();
-
-        try {
-            useCache = true;
-
-            startGridsMultiThreaded(GRID_CNT);
-
-            useCache = false;
-
-            Ignite ignite = startGrid();
-
-            final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
-
-            ldr.perNodeBufferSize(8192);
-            ldr.updater(DataStreamerCacheUpdaters.<Integer, String>batchedSorted());
-            ldr.autoFlushFrequency(0);
-
-            final LongAdder cnt = new LongAdder();
-
-            long start = U.currentTimeMillis();
-
-            Thread t = new Thread(new Runnable() {
-                @SuppressWarnings("BusyWait")
-                @Override public void run() {
-                    while (true) {
-                        try {
-                            Thread.sleep(10000);
-                        }
-                        catch (InterruptedException ignored) {
-                            break;
-                        }
-
-                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-
-            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
-
-            multithreaded(new Callable<Object>() {
-                @SuppressWarnings("InfiniteLoopStatement")
-                @Override public Object call() throws Exception {
-                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
-
-                    while (true) {
-                        int i = rnd.nextInt(ENTRY_CNT);
-
-                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
-
-                        cnt.increment();
-                    }
-                }
-            }, threadNum, "loader");
-
-            info("Closing loader...");
-
-            ldr.close(false);
-
-            long duration = U.currentTimeMillis() - start;
-
-            info("Finished performance test. Duration: " + duration + "ms.");
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/60a1a481/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
new file mode 100644
index 0000000..98737a1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code IgniteDataStreamerImpl}.
+ */
+public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of keys to load via data streamer. */
+    private static final int KEYS_COUNT = 1000;
+
+    /** Started grid counter. */
+    private static int cnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        // Forth node goes without cache.
+        if (cnt < 4)
+            cfg.setCacheConfiguration(cacheConfiguration());
+
+        cnt++;
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+        try {
+            startGrids(5);
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    U.awaitQuiet(barrier);
+
+                    G.stopAll(true);
+
+                    return null;
+                }
+            }, 1);
+
+            Ignite g4 = grid(4);
+
+            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+
+            dataLdr.perNodeBufferSize(32);
+
+            for (int i = 0; i < 100000; i += 2) {
+                dataLdr.addData(i, i);
+                dataLdr.removeData(i + 1);
+            }
+
+            U.awaitQuiet(barrier);
+
+            info("Closing data streamer.");
+
+            try {
+                dataLdr.close(true);
+            }
+            catch (IllegalStateException ignore) {
+                // This is ok to ignore this exception as test is racy by it's nature -
+                // grid is stopping in different thread.
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Data streamer should correctly load entries from HashMap in case of grids with more than one node
+     *  and with GridOptimizedMarshaller that requires serializable.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAddDataFromMap() throws Exception {
+        try {
+            cnt = 0;
+
+            startGrids(2);
+
+            Ignite g0 = grid(0);
+
+            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+
+            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+            for (int i = 0; i < KEYS_COUNT; i ++)
+                map.put(i, String.valueOf(i));
+
+            dataLdr.addData(map);
+
+            dataLdr.close();
+
+            Random rnd = new Random();
+
+            IgniteCache<Integer, String> c = g0.jcache(null);
+
+            for (int i = 0; i < KEYS_COUNT; i ++) {
+                Integer k = rnd.nextInt(KEYS_COUNT);
+
+                String v = c.get(k);
+
+                assertEquals(k.toString(), v);
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cacheCfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestObject implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         */
+        private TestObject() {
+            // No-op.
+        }
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        public Integer val() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof TestObject && ((TestObject)obj).val == val;
+        }
+    }
+}


Mime
View raw message