Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 20A4E18C43 for ; Tue, 9 Jun 2015 06:48:26 +0000 (UTC) Received: (qmail 37771 invoked by uid 500); 9 Jun 2015 06:48:26 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 37735 invoked by uid 500); 9 Jun 2015 06:48:26 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 37722 invoked by uid 99); 9 Jun 2015 06:48:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 06:48:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 09 Jun 2015 06:46:00 +0000 Received: (qmail 34429 invoked by uid 99); 9 Jun 2015 06:47:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Jun 2015 06:47:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C3807E025B; Tue, 9 Jun 2015 06:47:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sergi@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 09 Jun 2015 06:48:08 -0000 Message-Id: <90d205c8c4954c2aabdafc61f4b47341@git.apache.org> In-Reply-To: <2f02124015af44e0a406305c0c682c57@git.apache.org> References: <2f02124015af44e0a406305c0c682c57@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/50] incubator-ignite git commit: # ignite-sprint-5 use consistent topology version in streamer X-Virus-Checked: Checked by ClamAV on apache.org # ignite-sprint-5 use consistent topology version in streamer Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc1d427f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc1d427f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc1d427f Branch: refs/heads/ignite-484-1 Commit: dc1d427fa2f5235a5779058a444e3845946a7e07 Parents: 20e5677 Author: sboikov Authored: Fri Jun 5 09:54:37 2015 +0300 Committer: sboikov Committed: Fri Jun 5 11:21:36 2015 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityProcessor.java | 23 ++++- .../datastreamer/DataStreamerImpl.java | 92 ++++++++++++++------ .../DataStreamerMultiThreadedSelfTest.java | 59 +++++++++---- 3 files changed, 129 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index daa2bc2..aac63c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -164,14 +164,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * * @param cacheName Cache name. * @param key Key to map. + * @param topVer Topology version. * @return Affinity nodes, primary first. * @throws IgniteCheckedException If failed. */ - public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) throws IgniteCheckedException { + public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, + K key, + AffinityTopologyVersion topVer) + throws IgniteCheckedException + { A.notNull(key, "key"); - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - AffinityInfo affInfo = affinityCache(cacheName, topVer); if (affInfo == null) @@ -181,6 +184,20 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * Map single key to primary and backup nodes. + * + * @param cacheName Cache name. + * @param key Key to map. + * @return Affinity nodes, primary first. + * @throws IgniteCheckedException If failed. + */ + public List mapKeyToPrimaryAndBackups(@Nullable String cacheName, K key) + throws IgniteCheckedException + { + return mapKeyToPrimaryAndBackups(cacheName, key, ctx.discovery().topologyVersionEx()); + } + + /** * Gets affinity key for cache key. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index d16167a..ed8e573 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -198,19 +198,14 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed // Remap regular mappings. final Buffer buf = bufMappings.remove(id); + // Only async notification is possible since + // discovery thread may be trapped otherwise. if (buf != null) { - // Only async notification is possible since - // discovery thread may be trapped otherwise. - ctx.closure().callLocalSafe( - new Callable() { - @Override public Object call() throws Exception { - buf.onNodeLeft(); - - return null; - } - }, - true /* system pool */ - ); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf.onNodeLeft(); + } + }, discoEvt.topologyVersion(), true); } } }; @@ -248,6 +243,31 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } /** + * @param c Closure to run. + * @param topVer Topology version to wait for. + * @param async Async flag. + */ + private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) { + AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0); + + IgniteInternalFuture fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + ctx.closure().runLocalSafe(c, true); + } + }); + } + else { + if (async) + ctx.closure().runLocalSafe(c, true); + else + c.run(); + } + } + + /** * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { @@ -527,6 +547,8 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed boolean initPda = ctx.deploy().enabled() && jobPda == null; + AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion(); + for (DataStreamerEntry entry : entries) { List nodes; @@ -543,7 +565,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed initPda = false; } - nodes = nodes(key); + nodes = nodes(key, topVer); } catch (IgniteCheckedException e) { resFut.onDone(e); @@ -621,10 +643,10 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } }; - GridFutureAdapter f; + final GridFutureAdapter f; try { - f = buf.update(entriesForNode, lsnr); + f = buf.update(entriesForNode, topVer, lsnr); } catch (IgniteInterruptedCheckedException e1) { resFut.onDone(e1); @@ -633,30 +655,38 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } if (ctx.discovery().node(nodeId) == null) { - if (bufMappings.remove(nodeId, buf)) - buf.onNodeLeft(); + if (bufMappings.remove(nodeId, buf)) { + final Buffer buf0 = buf; - if (f != null) - f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + - "(node has left): " + nodeId)); + waitAffinityAndRun(new Runnable() { + @Override public void run() { + buf0.onNodeLeft(); + + if (f != null) + f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + + "(node has left): " + nodeId)); + } + }, ctx.discovery().topologyVersion(), false); + } } } } /** * @param key Key to map. + * @param topVer Topology version. * @return Nodes to send requests to. * @throws IgniteCheckedException If failed. */ - private List nodes(KeyCacheObject key) throws IgniteCheckedException { + private List nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException { GridAffinityProcessor aff = ctx.affinity(); List res = null; if (!allowOverwrite()) - res = aff.mapKeyToPrimaryAndBackups(cacheName, key); + res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer); else { - ClusterNode node = aff.mapKeyToNode(cacheName, key); + ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer); if (node != null) res = Collections.singletonList(node); @@ -959,11 +989,13 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** * @param newEntries Infos. + * @param topVer Topology version. * @param lsnr Listener for the operation future. * @throws IgniteInterruptedCheckedException If failed. * @return Future for operation. */ @Nullable GridFutureAdapter update(Iterable newEntries, + AffinityTopologyVersion topVer, IgniteInClosure> lsnr) throws IgniteInterruptedCheckedException { List entries0 = null; GridFutureAdapter curFut0; @@ -986,7 +1018,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } if (entries0 != null) { - submit(entries0, curFut0); + submit(entries0, topVer, curFut0); if (cancelled) curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this)); @@ -1023,7 +1055,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed } if (entries0 != null) - submit(entries0, curFut0); + submit(entries0, null, curFut0); // Create compound future for this flush. GridCompoundFuture res = null; @@ -1068,10 +1100,13 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** * @param entries Entries to submit. + * @param topVer Topology version. * @param curFut Current future. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void submit(final Collection entries, final GridFutureAdapter curFut) + private void submit(final Collection entries, + @Nullable AffinityTopologyVersion topVer, + final GridFutureAdapter curFut) throws IgniteInterruptedCheckedException { assert entries != null; assert !entries.isEmpty(); @@ -1160,6 +1195,9 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed reqs.put(reqId, (GridFutureAdapter)fut); + if (topVer == null) + topVer = ctx.cache().context().exchange().readyAffinityVersion(); + DataStreamerRequest req = new DataStreamerRequest( reqId, topicBytes, @@ -1174,7 +1212,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - ctx.cache().context().exchange().readyAffinityVersion()); + topVer); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc1d427f/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java index 2382a66..e0092d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java @@ -41,6 +41,9 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { /** IP finder. */ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** */ + private boolean dynamicCache; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -50,13 +53,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoSpi); + if (!dynamicCache) + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { CacheConfiguration ccfg = defaultCacheConfiguration(); ccfg.setCacheMode(PARTITIONED); ccfg.setBackups(1); - cfg.setCacheConfiguration(ccfg); - return cfg; + return ccfg; } /** {@inheritDoc} */ @@ -68,6 +80,22 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testStartStopIgnites() throws Exception { + startStopIgnites(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopIgnitesDynamicCache() throws Exception { + dynamicCache = true; + + startStopIgnites(); + } + + /** + * @throws Exception If failed. + */ + private void startStopIgnites() throws Exception { for (int attempt = 0; attempt < 3; ++attempt) { log.info("Iteration: " + attempt); @@ -75,28 +103,29 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { Set futs = new HashSet<>(); - IgniteInternalFuture fut; + final AtomicInteger igniteId = new AtomicInteger(1); - try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { - dataLdr.maxRemapCount(0); + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 1; i < 5; ++i) + startGrid(igniteId.incrementAndGet()); - final AtomicInteger igniteId = new AtomicInteger(1); + return true; + } + }, 2, "start-node-thread"); - fut = GridTestUtils.runMultiThreadedAsync(new Callable() { - @Override public Object call() throws Exception { - for (int i = 1; i < 5; ++i) - startGrid(igniteId.incrementAndGet()); + if (dynamicCache) + ignite.getOrCreateCache(cacheConfiguration()); - return true; - } - }, 2, "start-node-thread"); + try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { + dataLdr.maxRemapCount(0); - Random random = new Random(); + Random rnd = new Random(); long endTime = U.currentTimeMillis() + 15_000; while (!fut.isDone() && U.currentTimeMillis() < endTime) - futs.add(dataLdr.addData(random.nextInt(100_000), random.nextInt(100_000))); + futs.add(dataLdr.addData(rnd.nextInt(100_000), String.valueOf(rnd.nextInt(100_000)))); } for (IgniteFuture f : futs)