Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6F80417706 for ; Tue, 3 Mar 2015 13:56:23 +0000 (UTC) Received: (qmail 53946 invoked by uid 500); 3 Mar 2015 13:56:20 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 53913 invoked by uid 500); 3 Mar 2015 13:56:20 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 53902 invoked by uid 99); 3 Mar 2015 13:56:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:56:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 03 Mar 2015 13:55:49 +0000 Received: (qmail 49778 invoked by uid 99); 3 Mar 2015 13:55:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Mar 2015 13:55:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 31C89E07F3; Tue, 3 Mar 2015 13:55:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Message-Id: <62dfbfd603ad44ffbd92f274d689ba2e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ignite git commit: # gg-9869: rename GridDataLoaderProcessor -> IgniteDataStreamerProcessor + tests Date: Tue, 3 Mar 2015 13:55:45 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-394 d06158309 -> b27e1f3db # gg-9869: rename GridDataLoaderProcessor -> IgniteDataStreamerProcessor + tests Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b27e1f3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b27e1f3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b27e1f3d Branch: refs/heads/ignite-394 Commit: b27e1f3db751999f379fd60dd96380e77b65be4e Parents: d061583 Author: Artem Shutak Authored: Tue Mar 3 16:56:08 2015 +0300 Committer: Artem Shutak Committed: Tue Mar 3 16:56:08 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 2 +- .../ignite/internal/GridKernalContextImpl.java | 10 +- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../dataload/GridDataLoaderProcessor.java | 316 ------- .../dataload/IgniteDataStreamerProcessor.java | 316 +++++++ .../GridDataLoaderProcessorSelfTest.java | 924 ------------------- .../IgniteDataStreamerProcessorSelfTest.java | 924 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 2 +- 8 files changed, 1248 insertions(+), 1248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index cb9ffa1..c497c06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -247,7 +247,7 @@ public interface GridKernalContext extends Iterable { * * @return Data loader processor. */ - public GridDataLoaderProcessor dataLoad(); + public IgniteDataStreamerProcessor dataLoad(); /** * Gets file system processor. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 756c16a..71a357e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -202,7 +202,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private GridDataLoaderProcessor dataLdrProc; + private IgniteDataStreamerProcessor dataLdrProc; /** */ @GridToStringInclude @@ -446,8 +446,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable affProc = (GridAffinityProcessor)comp; else if (comp instanceof GridRestProcessor) restProc = (GridRestProcessor)comp; - else if (comp instanceof GridDataLoaderProcessor) - dataLdrProc = (GridDataLoaderProcessor)comp; + else if (comp instanceof IgniteDataStreamerProcessor) + dataLdrProc = (IgniteDataStreamerProcessor)comp; else if (comp instanceof IgfsProcessorAdapter) igfsProc = (IgfsProcessorAdapter)comp; else if (comp instanceof GridOffHeapProcessor) @@ -660,8 +660,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridDataLoaderProcessor dataLoad() { - return (GridDataLoaderProcessor)dataLdrProc; + @Override public IgniteDataStreamerProcessor dataLoad() { + return (IgniteDataStreamerProcessor)dataLdrProc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 336f872..2299f3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -753,7 +753,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(ctx, new GridTaskProcessor(ctx), attrs); startProcessor(ctx, (GridProcessor)SCHEDULE.createOptional(ctx), attrs); startProcessor(ctx, new GridRestProcessor(ctx), attrs); - startProcessor(ctx, new GridDataLoaderProcessor(ctx), attrs); + startProcessor(ctx, new IgniteDataStreamerProcessor(ctx), attrs); startProcessor(ctx, new GridStreamProcessor(ctx), attrs); startProcessor(ctx, (GridProcessor) IGFS.create(ctx, F.isEmpty(cfg.getIgfsConfiguration())), attrs); startProcessor(ctx, new GridContinuousProcessor(ctx), attrs); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java deleted file mode 100644 index b29c9ea..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.thread.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.GridTopic.*; -import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; - -/** - * - */ -public class GridDataLoaderProcessor extends GridProcessorAdapter { - /** Loaders map (access is not supposed to be highly concurrent). */ - private Collection ldrs = new GridConcurrentHashSet<>(); - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** Flushing thread. */ - private Thread flusher; - - /** */ - private final DelayQueue> flushQ = new DelayQueue<>(); - - /** Marshaller. */ - private final Marshaller marsh; - - /** - * @param ctx Kernal context. - */ - public GridDataLoaderProcessor(GridKernalContext ctx) { - super(ctx); - - ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - assert msg instanceof GridDataLoadRequest; - - processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); - } - }); - - marsh = ctx.config().getMarshaller(); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (ctx.config().isDaemon()) - return; - - flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { - @Override protected void body() throws InterruptedException { - while (!isCancelled()) { - IgniteDataStreamerImpl ldr = flushQ.take(); - - if (!busyLock.enterBusy()) - return; - - try { - if (ldr.isClosed()) - continue; - - ldr.tryFlush(); - - flushQ.offer(ldr); - } - finally { - busyLock.leaveBusy(); - } - } - } - }); - - flusher.start(); - - if (log.isDebugEnabled()) - log.debug("Started data loader processor."); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (ctx.config().isDaemon()) - return; - - ctx.io().removeMessageListener(TOPIC_DATALOAD); - - busyLock.block(); - - U.interrupt(flusher); - U.join(flusher, log); - - for (IgniteDataStreamerImpl ldr : ldrs) { - if (log.isDebugEnabled()) - log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); - - try { - ldr.closeEx(cancel); - } - catch (IgniteInterruptedCheckedException e) { - U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to close data loader: " + ldr, e); - } - } - - if (log.isDebugEnabled()) - log.debug("Stopped data loader processor."); - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @param compact {@code true} if data loader should transfer data in compact format. - * @return Data loader. - */ - public IgniteDataStreamerImpl dataLoader(@Nullable String cacheName, boolean compact) { - if (!busyLock.enterBusy()) - throw new IllegalStateException("Failed to create data loader (grid is stopping)."); - - try { - final IgniteDataStreamerImpl ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact); - - ldrs.add(ldr); - - ldr.internalFuture().listenAsync(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - boolean b = ldrs.remove(ldr); - - assert b : "Loader has not been added to set: " + ldr; - - if (log.isDebugEnabled()) - log.debug("Loader has been completed: " + ldr); - } - }); - - return ldr; - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param cacheName Cache name ({@code null} for default cache). - * @return Data loader. - */ - public IgniteDataStreamer dataLoader(@Nullable String cacheName) { - return dataLoader(cacheName, true); - } - - /** - * @param nodeId Sender ID. - * @param req Request. - */ - private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { - if (!busyLock.enterBusy()) { - if (log.isDebugEnabled()) - log.debug("Ignoring data load request (node is stopping): " + req); - - return; - } - - try { - if (log.isDebugEnabled()) - log.debug("Processing data load request: " + req); - - Object topic; - - try { - topic = marsh.unmarshal(req.responseTopicBytes(), null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal topic from request: " + req, e); - - return; - } - - ClassLoader clsLdr; - - if (req.forceLocalDeployment()) - clsLdr = U.gridClassLoader(); - else { - GridDeployment dep = ctx.deploy().getGlobalDeployment( - req.deploymentMode(), - req.sampleClassName(), - req.sampleClassName(), - req.userVersion(), - nodeId, - req.classLoaderId(), - req.participants(), - null); - - if (dep == null) { - sendResponse(nodeId, - topic, - req.requestId(), - new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + - ", req=" + req + ']'), - false); - - return; - } - - clsLdr = dep.classLoader(); - } - - Collection> col; - IgniteDataStreamer.Updater updater; - - try { - col = marsh.unmarshal(req.collectionBytes(), clsLdr); - updater = marsh.unmarshal(req.updaterBytes(), clsLdr); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); - - sendResponse(nodeId, topic, req.requestId(), e, false); - - return; - } - - GridDataLoadUpdateJob job = new GridDataLoadUpdateJob<>(ctx, - log, - req.cacheName(), - col, - req.ignoreDeploymentOwnership(), - req.skipStore(), - updater); - - Exception err = null; - - try { - job.call(); - } - catch (Exception e) { - U.error(log, "Failed to finish update job.", e); - - err = e; - } - - sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * @param nodeId Node ID. - * @param resTopic Response topic. - * @param reqId Request ID. - * @param err Error. - * @param forceLocDep Force local deployment. - */ - private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, - boolean forceLocDep) { - byte[] errBytes; - - try { - errBytes = err != null ? marsh.marshal(err) : null; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal message.", e); - - return; - } - - GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep); - - try { - ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); - } - catch (IgniteCheckedException e) { - if (ctx.discovery().alive(nodeId)) - U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); - else if (log.isDebugEnabled()) - log.debug("Node has left the grid: " + nodeId); - } - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - X.println(">>>"); - X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']'); - X.println(">>> ldrsSize: " + ldrs.size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java new file mode 100644 index 0000000..a2effbc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.thread.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + +/** + * + */ +public class IgniteDataStreamerProcessor extends GridProcessorAdapter { + /** Loaders map (access is not supposed to be highly concurrent). */ + private Collection ldrs = new GridConcurrentHashSet<>(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** Flushing thread. */ + private Thread flusher; + + /** */ + private final DelayQueue> flushQ = new DelayQueue<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** + * @param ctx Kernal context. + */ + public IgniteDataStreamerProcessor(GridKernalContext ctx) { + super(ctx); + + ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg instanceof GridDataLoadRequest; + + processDataLoadRequest(nodeId, (GridDataLoadRequest)msg); + } + }); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (ctx.config().isDaemon()) + return; + + flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) { + @Override protected void body() throws InterruptedException { + while (!isCancelled()) { + IgniteDataStreamerImpl ldr = flushQ.take(); + + if (!busyLock.enterBusy()) + return; + + try { + if (ldr.isClosed()) + continue; + + ldr.tryFlush(); + + flushQ.offer(ldr); + } + finally { + busyLock.leaveBusy(); + } + } + } + }); + + flusher.start(); + + if (log.isDebugEnabled()) + log.debug("Started data loader processor."); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (ctx.config().isDaemon()) + return; + + ctx.io().removeMessageListener(TOPIC_DATALOAD); + + busyLock.block(); + + U.interrupt(flusher); + U.join(flusher, log); + + for (IgniteDataStreamerImpl ldr : ldrs) { + if (log.isDebugEnabled()) + log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']'); + + try { + ldr.closeEx(cancel); + } + catch (IgniteInterruptedCheckedException e) { + U.warn(log, "Interrupted while waiting for completion of the data loader: " + ldr, e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close data loader: " + ldr, e); + } + } + + if (log.isDebugEnabled()) + log.debug("Stopped data loader processor."); + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @param compact {@code true} if data loader should transfer data in compact format. + * @return Data loader. + */ + public IgniteDataStreamerImpl dataLoader(@Nullable String cacheName, boolean compact) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to create data loader (grid is stopping)."); + + try { + final IgniteDataStreamerImpl ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact); + + ldrs.add(ldr); + + ldr.internalFuture().listenAsync(new CI1>() { + @Override public void apply(IgniteInternalFuture f) { + boolean b = ldrs.remove(ldr); + + assert b : "Loader has not been added to set: " + ldr; + + if (log.isDebugEnabled()) + log.debug("Loader has been completed: " + ldr); + } + }); + + return ldr; + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param cacheName Cache name ({@code null} for default cache). + * @return Data loader. + */ + public IgniteDataStreamer dataLoader(@Nullable String cacheName) { + return dataLoader(cacheName, true); + } + + /** + * @param nodeId Sender ID. + * @param req Request. + */ + private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) { + if (!busyLock.enterBusy()) { + if (log.isDebugEnabled()) + log.debug("Ignoring data load request (node is stopping): " + req); + + return; + } + + try { + if (log.isDebugEnabled()) + log.debug("Processing data load request: " + req); + + Object topic; + + try { + topic = marsh.unmarshal(req.responseTopicBytes(), null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal topic from request: " + req, e); + + return; + } + + ClassLoader clsLdr; + + if (req.forceLocalDeployment()) + clsLdr = U.gridClassLoader(); + else { + GridDeployment dep = ctx.deploy().getGlobalDeployment( + req.deploymentMode(), + req.sampleClassName(), + req.sampleClassName(), + req.userVersion(), + nodeId, + req.classLoaderId(), + req.participants(), + null); + + if (dep == null) { + sendResponse(nodeId, + topic, + req.requestId(), + new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId + + ", req=" + req + ']'), + false); + + return; + } + + clsLdr = dep.classLoader(); + } + + Collection> col; + IgniteDataStreamer.Updater updater; + + try { + col = marsh.unmarshal(req.collectionBytes(), clsLdr); + updater = marsh.unmarshal(req.updaterBytes(), clsLdr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e); + + sendResponse(nodeId, topic, req.requestId(), e, false); + + return; + } + + GridDataLoadUpdateJob job = new GridDataLoadUpdateJob<>(ctx, + log, + req.cacheName(), + col, + req.ignoreDeploymentOwnership(), + req.skipStore(), + updater); + + Exception err = null; + + try { + job.call(); + } + catch (Exception e) { + U.error(log, "Failed to finish update job.", e); + + err = e; + } + + sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment()); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param nodeId Node ID. + * @param resTopic Response topic. + * @param reqId Request ID. + * @param err Error. + * @param forceLocDep Force local deployment. + */ + private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err, + boolean forceLocDep) { + byte[] errBytes; + + try { + errBytes = err != null ? marsh.marshal(err) : null; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal message.", e); + + return; + } + + GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep); + + try { + ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + if (ctx.discovery().alive(nodeId)) + U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e); + else if (log.isDebugEnabled()) + log.debug("Node has left the grid: " + nodeId); + } + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> ldrsSize: " + ldrs.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java deleted file mode 100644 index 5959957..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java +++ /dev/null @@ -1,924 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.dataload; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * - */ -public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { - /** */ - private static ConcurrentHashMap storeMap; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private CacheMode mode = PARTITIONED; - - /** */ - private boolean nearEnabled = true; - - /** */ - private boolean useCache; - - /** */ - private TestStore store; - - /** {@inheritDoc} */ - @Override public void afterTest() throws Exception { - super.afterTest(); - - useCache = false; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - cfg.setIncludeProperties(); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - if (useCache) { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(mode); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cc.setWriteSynchronizationMode(FULL_SYNC); - - cc.setEvictSynchronized(false); - cc.setEvictNearSynchronized(false); - - if (store != null) { - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - } - - cfg.setCacheConfiguration(cc); - } - else - cfg.setCacheConfiguration(); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPartitioned() throws Exception { - mode = PARTITIONED; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testColocated() throws Exception { - mode = PARTITIONED; - nearEnabled = false; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - mode = REPLICATED; - - checkDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testLocal() throws Exception { - mode = LOCAL; - - try { - checkDataLoader(); - - assert false; - } - catch (IgniteCheckedException e) { - // Cannot load local cache configured remotely. - info("Caught expected exception: " + e); - } - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("ErrorNotRethrown") - private void checkDataLoader() throws Exception { - try { - Ignite g1 = startGrid(1); - - useCache = true; - - Ignite g2 = startGrid(2); - startGrid(3); - - final IgniteDataStreamer ldr = g1.dataLoader(null); - - ldr.updater(GridDataLoadCacheUpdaters.batchedSorted()); - - final AtomicInteger idxGen = new AtomicInteger(); - final int cnt = 400; - final int threads = 10; - - final CountDownLatch l1 = new CountDownLatch(threads); - - IgniteInternalFuture f1 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - Collection> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - futs.add(ldr.addData(idx, idx)); - } - - l1.countDown(); - - for (IgniteFuture fut : futs) - fut.get(); - - return null; - } - }, threads); - - l1.await(); - - // This will wait until data loader finishes loading. - stopGrid(getTestGridName(1), false); - - f1.get(); - - int s2 = internalCache(2).primaryKeySet().size(); - int s3 = internalCache(3).primaryKeySet().size(); - int total = threads * cnt; - - assertEquals(total, s2 + s3); - - final IgniteDataStreamer rmvLdr = g2.dataLoader(null); - - rmvLdr.updater(GridDataLoadCacheUpdaters.batchedSorted()); - - final CountDownLatch l2 = new CountDownLatch(threads); - - IgniteInternalFuture f2 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - Collection> futs = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - final int key = idxGen.decrementAndGet(); - - futs.add(rmvLdr.removeData(key)); - } - - l2.countDown(); - - for (IgniteFuture fut : futs) - fut.get(); - - return null; - } - }, threads); - - l2.await(); - - rmvLdr.close(false); - - f2.get(); - - s2 = internalCache(2).primaryKeySet().size(); - s3 = internalCache(3).primaryKeySet().size(); - - assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedIsolated() throws Exception { - mode = PARTITIONED; - - checkIsolatedDataLoader(); - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedIsolated() throws Exception { - mode = REPLICATED; - - checkIsolatedDataLoader(); - } - - /** - * @throws Exception If failed. - */ - private void checkIsolatedDataLoader() throws Exception { - try { - useCache = true; - - Ignite g1 = startGrid(0); - startGrid(1); - startGrid(2); - - awaitPartitionMapExchange(); - - GridCache cache = ((IgniteKernal)grid(0)).cache(null); - - for (int i = 0; i < 100; i++) - cache.put(i, -1); - - final int cnt = 40_000; - final int threads = 10; - - try (final IgniteDataStreamer ldr = g1.dataLoader(null)) { - final AtomicInteger idxGen = new AtomicInteger(); - - IgniteInternalFuture f1 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - for (int i = 0; i < cnt; i++) { - int idx = idxGen.getAndIncrement(); - - ldr.addData(idx, idx); - } - - return null; - } - }, threads); - - f1.get(); - } - - for (int g = 0; g < 3; g++) { - ClusterNode locNode = grid(g).localNode(); - - GridCacheAdapter cache0 = ((IgniteKernal)grid(g)).internalCache(null); - - if (cache0.isNear()) - cache0 = ((GridNearCacheAdapter)cache0).dht(); - - CacheAffinity aff = cache0.affinity(); - - for (int key = 0; key < cnt * threads; key++) { - if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { - GridCacheEntryEx entry = cache0.peekEx(key); - - assertNotNull("Missing entry for key: " + key, entry); - assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false)); - } - } - } - } - finally { - stopAllGrids(); - } - } - - /** - * Test primitive arrays can be passed into data loader. - * - * @throws Exception If failed. - */ - public void testPrimitiveArrays() throws Exception { - try { - useCache = true; - mode = PARTITIONED; - - Ignite g1 = startGrid(1); - startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). - - List arrays = Arrays.asList( - new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, - new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); - - IgniteDataStreamer dataLdr = g1.dataLoader(null); - - for (int i = 0, size = arrays.size(); i < 1000; i++) { - Object arr = arrays.get(i % size); - - dataLdr.addData(i, arr); - dataLdr.addData(i, fixedClosure(arr)); - } - - dataLdr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedMultiThreaded() throws Exception { - mode = REPLICATED; - - checkLoaderMultithreaded(1, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPartitionedMultiThreaded() throws Exception { - mode = PARTITIONED; - - checkLoaderMultithreaded(1, 3); - } - - /** - * Tests loader in multithreaded environment with various count of grids started. - * - * @param nodesCntNoCache How many nodes should be started without cache. - * @param nodesCntCache How many nodes should be started with cache. - * @throws Exception If failed. - */ - protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) - throws Exception { - try { - // Start all required nodes. - int idx = 1; - - for (int i = 0; i < nodesCntNoCache; i++) - startGrid(idx++); - - useCache = true; - - for (int i = 0; i < nodesCntCache; i++) - startGrid(idx++); - - Ignite g1 = grid(1); - - // Get and configure loader. - final IgniteDataStreamer ldr = g1.dataLoader(null); - - ldr.updater(GridDataLoadCacheUpdaters.individual()); - ldr.perNodeBufferSize(2); - - // Define count of puts. - final AtomicInteger idxGen = new AtomicInteger(); - - final AtomicBoolean done = new AtomicBoolean(); - - try { - final int totalPutCnt = 50000; - - IgniteInternalFuture fut1 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - Collection> futs = new ArrayList<>(); - - while (!done.get()) { - int idx = idxGen.getAndIncrement(); - - if (idx >= totalPutCnt) { - info(">>> Stopping producer thread since maximum count of puts reached."); - - break; - } - - futs.add(ldr.addData(idx, idx)); - } - - ldr.flush(); - - for (IgniteFuture fut : futs) - fut.get(); - - return null; - } - }, 5, "producer"); - - IgniteInternalFuture fut2 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - while (!done.get()) { - ldr.flush(); - - U.sleep(100); - } - - return null; - } - }, 1, "flusher"); - - // Define index of node being restarted. - final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; - - IgniteInternalFuture fut3 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - try { - for (int i = 0; i < 5; i++) { - Ignite g = startGrid(restartNodeIdx); - - UUID id = g.cluster().localNode().id(); - - info(">>>>>>> Started node: " + id); - - U.sleep(1000); - - stopGrid(getTestGridName(restartNodeIdx), true); - - info(">>>>>>> Stopped node: " + id); - } - } - finally { - done.set(true); - - info("Start stop thread finished."); - } - - return null; - } - }, 1, "start-stop-thread"); - - fut1.get(); - fut2.get(); - fut3.get(); - } - finally { - ldr.close(false); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testLoaderApi() throws Exception { - useCache = true; - - try { - Ignite g1 = startGrid(1); - - IgniteDataStreamer ldr = g1.dataLoader(null); - - ldr.close(false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - - try { - // Create another loader. - ldr = g1.dataLoader("UNKNOWN_CACHE"); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - ldr.close(true); - - assert ldr.future().isDone(); - - ldr.future().get(); - - // Create another loader. - ldr = g1.dataLoader(null); - - // Cancel with future. - ldr.future().cancel(); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - try { - ldr.future().get(); - - assert false; - } - catch (IgniteFutureCancelledException e) { - info("Caught expected exception: " + e); - } - - // Create another loader. - ldr = g1.dataLoader(null); - - // This will close loader. - stopGrid(getTestGridName(1), false); - - try { - ldr.addData(0, 0); - - assert false; - } - catch (IllegalStateException e) { - info("Caught expected exception: " + e); - } - - assert ldr.future().isDone(); - - ldr.future().get(); - } - finally { - stopAllGrids(); - } - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Callable. - */ - private static Callable callable(@Nullable final Integer i) { - return new Callable() { - @Override public Integer call() throws Exception { - return i; - } - }; - } - - /** - * Wraps integer to closure returning it. - * - * @param i Value to wrap. - * @return Closure. - */ - private static IgniteClosure closure(@Nullable final Integer i) { - return new IgniteClosure() { - @Override public Integer apply(Integer e) { - return e == null ? i : e + i; - } - }; - } - - /** - * Wraps object to closure returning it. - * - * @param obj Value to wrap. - * @return Closure. - */ - private static IgniteClosure fixedClosure(@Nullable final T obj) { - return new IgniteClosure() { - @Override public T apply(T e) { - assert e == null || obj == null || e.getClass() == obj.getClass() : - "Expects the same types [e=" + e + ", obj=" + obj + ']'; - - return obj; - } - }; - } - - /** - * Wraps integer to closure expecting it and returning {@code null}. - * - * @param exp Expected closure value. - * @return Remove expected cache value closure. - */ - private static IgniteClosure removeClosure(@Nullable final T exp) { - return new IgniteClosure() { - @Override public T apply(T act) { - if (exp == null ? act == null : exp.equals(act)) - return null; - - throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); - } - }; - } - - /** - * @throws Exception If failed. - */ - public void testFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final IgniteCache c = g.jcache(null); - - final IgniteDataStreamer ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - multithreaded(new Callable() { - @Override - public Void call() throws Exception { - ldr.flush(); - - assertEquals(9, c.size()); - - return null; - } - }, 5, "flush-checker"); - - ldr.addData(100, 100); - - ldr.flush(); - - assertEquals(10, c.size()); - - ldr.addData(200, 200); - - ldr.close(false); - - ldr.future().get(); - - assertEquals(11, c.size()); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testTryFlush() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - IgniteCache c = g.jcache(null); - - IgniteDataStreamer ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - ldr.tryFlush(); - - Thread.sleep(100); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testFlushTimeout() throws Exception { - mode = LOCAL; - - useCache = true; - - try { - Ignite g = startGrid(); - - final CountDownLatch latch = new CountDownLatch(9); - - g.events().localListen(new IgnitePredicate() { - @Override public boolean apply(Event evt) { - latch.countDown(); - - return true; - } - }, EVT_CACHE_OBJECT_PUT); - - IgniteCache c = g.jcache(null); - - assertTrue(c.localSize() == 0); - - IgniteDataStreamer ldr = g.dataLoader(null); - - ldr.perNodeBufferSize(10); - ldr.autoFlushFrequency(3000); - ldr.allowOverwrite(true); - - for (int i = 0; i < 9; i++) - ldr.addData(i, i); - - assertTrue(c.localSize() == 0); - - assertFalse(latch.await(1000, MILLISECONDS)); - - assertTrue(c.localSize() == 0); - - assertTrue(latch.await(3000, MILLISECONDS)); - - assertEquals(9, c.size()); - - ldr.close(false); - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testUpdateStore() throws Exception { - storeMap = new ConcurrentHashMap<>(); - - try { - store = new TestStore(); - - useCache = true; - - Ignite ignite = startGrid(1); - - startGrid(2); - startGrid(3); - - for (int i = 0; i < 1000; i++) - storeMap.put(i, i); - - try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { - ldr.allowOverwrite(true); - - assertFalse(ldr.skipStore()); - - for (int i = 0; i < 1000; i++) - ldr.removeData(i); - - for (int i = 1000; i < 2000; i++) - ldr.addData(i, i); - } - - for (int i = 0; i < 1000; i++) - assertNull(storeMap.get(i)); - - for (int i = 1000; i < 2000; i++) - assertEquals(i, storeMap.get(i)); - - try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { - ldr.allowOverwrite(true); - - ldr.skipStore(true); - - for (int i = 0; i < 1000; i++) - ldr.addData(i, i); - - for (int i = 1000; i < 2000; i++) - ldr.removeData(i); - } - - IgniteCache cache = ignite.jcache(null); - - for (int i = 0; i < 1000; i++) { - assertNull(storeMap.get(i)); - - assertEquals(i, cache.get(i)); - } - - for (int i = 1000; i < 2000; i++) { - assertEquals(i, storeMap.get(i)); - - assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); - } - } - finally { - storeMap = null; - } - } - - /** - * - */ - private static class TestObject { - /** Value. */ - private final int val; - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - TestObject obj = (TestObject)o; - - return val == obj.val; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return val; - } - } - - /** - * - */ - private static class TestStore extends CacheStoreAdapter { - /** {@inheritDoc} */ - @Nullable @Override public Object load(Object key) { - return storeMap.get(key); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry entry) { - storeMap.put(entry.getKey(), entry.getValue()); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java new file mode 100644 index 0000000..393642d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java @@ -0,0 +1,924 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest { + /** */ + private static ConcurrentHashMap storeMap; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheMode mode = PARTITIONED; + + /** */ + private boolean nearEnabled = true; + + /** */ + private boolean useCache; + + /** */ + private TestStore store; + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + useCache = false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } + + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + mode = LOCAL; + + try { + checkDataLoader(); + + assert false; + } + catch (IgniteCheckedException e) { + // Cannot load local cache configured remotely. + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkDataLoader() throws Exception { + try { + Ignite g1 = startGrid(1); + + useCache = true; + + Ignite g2 = startGrid(2); + startGrid(3); + + final IgniteDataStreamer ldr = g1.dataLoader(null); + + ldr.updater(GridDataLoadCacheUpdaters.batchedSorted()); + + final AtomicInteger idxGen = new AtomicInteger(); + final int cnt = 400; + final int threads = 10; + + final CountDownLatch l1 = new CountDownLatch(threads); + + IgniteInternalFuture f1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + Collection> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + futs.add(ldr.addData(idx, idx)); + } + + l1.countDown(); + + for (IgniteFuture fut : futs) + fut.get(); + + return null; + } + }, threads); + + l1.await(); + + // This will wait until data loader finishes loading. + stopGrid(getTestGridName(1), false); + + f1.get(); + + int s2 = internalCache(2).primaryKeySet().size(); + int s3 = internalCache(3).primaryKeySet().size(); + int total = threads * cnt; + + assertEquals(total, s2 + s3); + + final IgniteDataStreamer rmvLdr = g2.dataLoader(null); + + rmvLdr.updater(GridDataLoadCacheUpdaters.batchedSorted()); + + final CountDownLatch l2 = new CountDownLatch(threads); + + IgniteInternalFuture f2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + Collection> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + final int key = idxGen.decrementAndGet(); + + futs.add(rmvLdr.removeData(key)); + } + + l2.countDown(); + + for (IgniteFuture fut : futs) + fut.get(); + + return null; + } + }, threads); + + l2.await(); + + rmvLdr.close(false); + + f2.get(); + + s2 = internalCache(2).primaryKeySet().size(); + s3 = internalCache(3).primaryKeySet().size(); + + assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedIsolated() throws Exception { + mode = PARTITIONED; + + checkIsolatedDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedIsolated() throws Exception { + mode = REPLICATED; + + checkIsolatedDataLoader(); + } + + /** + * @throws Exception If failed. + */ + private void checkIsolatedDataLoader() throws Exception { + try { + useCache = true; + + Ignite g1 = startGrid(0); + startGrid(1); + startGrid(2); + + awaitPartitionMapExchange(); + + GridCache cache = ((IgniteKernal)grid(0)).cache(null); + + for (int i = 0; i < 100; i++) + cache.put(i, -1); + + final int cnt = 40_000; + final int threads = 10; + + try (final IgniteDataStreamer ldr = g1.dataLoader(null)) { + final AtomicInteger idxGen = new AtomicInteger(); + + IgniteInternalFuture f1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + ldr.addData(idx, idx); + } + + return null; + } + }, threads); + + f1.get(); + } + + for (int g = 0; g < 3; g++) { + ClusterNode locNode = grid(g).localNode(); + + GridCacheAdapter cache0 = ((IgniteKernal)grid(g)).internalCache(null); + + if (cache0.isNear()) + cache0 = ((GridNearCacheAdapter)cache0).dht(); + + CacheAffinity aff = cache0.affinity(); + + for (int key = 0; key < cnt * threads; key++) { + if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) { + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("Missing entry for key: " + key, entry); + assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false)); + } + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * Test primitive arrays can be passed into data loader. + * + * @throws Exception If failed. + */ + public void testPrimitiveArrays() throws Exception { + try { + useCache = true; + mode = PARTITIONED; + + Ignite g1 = startGrid(1); + startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + + List arrays = Arrays.asList( + new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, + new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); + + IgniteDataStreamer dataLdr = g1.dataLoader(null); + + for (int i = 0, size = arrays.size(); i < 1000; i++) { + Object arr = arrays.get(i % size); + + dataLdr.addData(i, arr); + dataLdr.addData(i, fixedClosure(arr)); + } + + dataLdr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreaded() throws Exception { + mode = REPLICATED; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreaded() throws Exception { + mode = PARTITIONED; + + checkLoaderMultithreaded(1, 3); + } + + /** + * Tests loader in multithreaded environment with various count of grids started. + * + * @param nodesCntNoCache How many nodes should be started without cache. + * @param nodesCntCache How many nodes should be started with cache. + * @throws Exception If failed. + */ + protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) + throws Exception { + try { + // Start all required nodes. + int idx = 1; + + for (int i = 0; i < nodesCntNoCache; i++) + startGrid(idx++); + + useCache = true; + + for (int i = 0; i < nodesCntCache; i++) + startGrid(idx++); + + Ignite g1 = grid(1); + + // Get and configure loader. + final IgniteDataStreamer ldr = g1.dataLoader(null); + + ldr.updater(GridDataLoadCacheUpdaters.individual()); + ldr.perNodeBufferSize(2); + + // Define count of puts. + final AtomicInteger idxGen = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final int totalPutCnt = 50000; + + IgniteInternalFuture fut1 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + Collection> futs = new ArrayList<>(); + + while (!done.get()) { + int idx = idxGen.getAndIncrement(); + + if (idx >= totalPutCnt) { + info(">>> Stopping producer thread since maximum count of puts reached."); + + break; + } + + futs.add(ldr.addData(idx, idx)); + } + + ldr.flush(); + + for (IgniteFuture fut : futs) + fut.get(); + + return null; + } + }, 5, "producer"); + + IgniteInternalFuture fut2 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + while (!done.get()) { + ldr.flush(); + + U.sleep(100); + } + + return null; + } + }, 1, "flusher"); + + // Define index of node being restarted. + final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; + + IgniteInternalFuture fut3 = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid(restartNodeIdx); + + UUID id = g.cluster().localNode().id(); + + info(">>>>>>> Started node: " + id); + + U.sleep(1000); + + stopGrid(getTestGridName(restartNodeIdx), true); + + info(">>>>>>> Stopped node: " + id); + } + } + finally { + done.set(true); + + info("Start stop thread finished."); + } + + return null; + } + }, 1, "start-stop-thread"); + + fut1.get(); + fut2.get(); + fut3.get(); + } + finally { + ldr.close(false); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoaderApi() throws Exception { + useCache = true; + + try { + Ignite g1 = startGrid(1); + + IgniteDataStreamer ldr = g1.dataLoader(null); + + ldr.close(false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + + try { + // Create another loader. + ldr = g1.dataLoader("UNKNOWN_CACHE"); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + ldr.close(true); + + assert ldr.future().isDone(); + + ldr.future().get(); + + // Create another loader. + ldr = g1.dataLoader(null); + + // Cancel with future. + ldr.future().cancel(); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + try { + ldr.future().get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + + // Create another loader. + ldr = g1.dataLoader(null); + + // This will close loader. + stopGrid(getTestGridName(1), false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + } + finally { + stopAllGrids(); + } + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Callable. + */ + private static Callable callable(@Nullable final Integer i) { + return new Callable() { + @Override public Integer call() throws Exception { + return i; + } + }; + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Closure. + */ + private static IgniteClosure closure(@Nullable final Integer i) { + return new IgniteClosure() { + @Override public Integer apply(Integer e) { + return e == null ? i : e + i; + } + }; + } + + /** + * Wraps object to closure returning it. + * + * @param obj Value to wrap. + * @return Closure. + */ + private static IgniteClosure fixedClosure(@Nullable final T obj) { + return new IgniteClosure() { + @Override public T apply(T e) { + assert e == null || obj == null || e.getClass() == obj.getClass() : + "Expects the same types [e=" + e + ", obj=" + obj + ']'; + + return obj; + } + }; + } + + /** + * Wraps integer to closure expecting it and returning {@code null}. + * + * @param exp Expected closure value. + * @return Remove expected cache value closure. + */ + private static IgniteClosure removeClosure(@Nullable final T exp) { + return new IgniteClosure() { + @Override public T apply(T act) { + if (exp == null ? act == null : exp.equals(act)) + return null; + + throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final IgniteCache c = g.jcache(null); + + final IgniteDataStreamer ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + multithreaded(new Callable() { + @Override + public Void call() throws Exception { + ldr.flush(); + + assertEquals(9, c.size()); + + return null; + } + }, 5, "flush-checker"); + + ldr.addData(100, 100); + + ldr.flush(); + + assertEquals(10, c.size()); + + ldr.addData(200, 200); + + ldr.close(false); + + ldr.future().get(); + + assertEquals(11, c.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTryFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + IgniteCache c = g.jcache(null); + + IgniteDataStreamer ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + ldr.tryFlush(); + + Thread.sleep(100); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushTimeout() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final CountDownLatch latch = new CountDownLatch(9); + + g.events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_OBJECT_PUT); + + IgniteCache c = g.jcache(null); + + assertTrue(c.localSize() == 0); + + IgniteDataStreamer ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + ldr.autoFlushFrequency(3000); + ldr.allowOverwrite(true); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.localSize() == 0); + + assertFalse(latch.await(1000, MILLISECONDS)); + + assertTrue(c.localSize() == 0); + + assertTrue(latch.await(3000, MILLISECONDS)); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { + ldr.allowOverwrite(true); + + assertFalse(ldr.skipStore()); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataStreamer ldr = ignite.dataLoader(null)) { + ldr.allowOverwrite(true); + + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + } + } + finally { + storeMap = null; + } + } + + /** + * + */ + private static class TestObject { + /** Value. */ + private final int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestObject obj = (TestObject)o; + + return val == obj.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b27e1f3d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 24c8e04..a30eea5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -109,7 +109,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheBalancingStoreSelfTest.class); suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); - suite.addTestSuite(GridDataLoaderProcessorSelfTest.class); + suite.addTestSuite(IgniteDataStreamerProcessorSelfTest.class); suite.addTestSuite(GridDataLoaderImplSelfTest.class); suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class); suite.addTestSuite(GridCacheClearAllSelfTest.class);