Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0CDCF10671 for ; Mon, 27 Apr 2015 11:03:05 +0000 (UTC) Received: (qmail 90101 invoked by uid 500); 27 Apr 2015 11:03:05 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 90025 invoked by uid 500); 27 Apr 2015 11:03:04 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 90015 invoked by uid 99); 27 Apr 2015 11:03:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 11:03:04 +0000 X-ASF-Spam-Status: No, hits=-0.0 required=5.0 tests=SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: local policy) Received: from [54.191.145.13] (HELO mx1-us-west.apache.org) (54.191.145.13) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 11:02:59 +0000 Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 0F40F27367 for ; Mon, 27 Apr 2015 11:02:39 +0000 (UTC) Received: (qmail 84345 invoked by uid 99); 27 Apr 2015 11:02:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 11:02:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE426E116E; Mon, 27 Apr 2015 11:02:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 27 Apr 2015 11:02:40 -0000 Message-Id: <6b17acaadd4d4e1a9515381df45f09bc@git.apache.org> In-Reply-To: <10be23986d0a4291a0fb228082ff5ca1@git.apache.org> References: <10be23986d0a4291a0fb228082ff5ca1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/14] incubator-ignite git commit: #ignite-710: DataStreamer message is processed before cache started X-Virus-Checked: Checked by ClamAV on apache.org #ignite-710: DataStreamer message is processed before cache started Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f13b63d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f13b63d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f13b63d7 Branch: refs/heads/ignite-794 Commit: f13b63d73606e73f2f5d45c0b9291252444b7437 Parents: 0a173f4 Author: ivasilinets Authored: Fri Apr 24 10:29:58 2015 +0300 Committer: ivasilinets Committed: Fri Apr 24 10:29:58 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 +- .../datastreamer/DataStreamProcessor.java | 28 ++++- .../datastreamer/DataStreamerImpl.java | 3 +- .../datastreamer/DataStreamerRequest.java | 38 ++++++- .../DataStreamerMultiThreadedSelfTest.java | 101 +++++++++++++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 1 + 6 files changed, 166 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 851fc44..5f82ae2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -409,7 +409,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ - @Nullable IgniteInternalFuture affinityReadyFuture(AffinityTopologyVersion ver) { + public @Nullable IgniteInternalFuture affinityReadyFuture(AffinityTopologyVersion ver) { GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 3a2936f..9e53bb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -173,7 +174,7 @@ public class DataStreamProcessor extends GridProcessorAdapter { * @param nodeId Sender ID. * @param req Request. */ - private void processRequest(UUID nodeId, DataStreamerRequest req) { + private void processRequest(final UUID nodeId, final DataStreamerRequest req) { if (!busyLock.enterBusy()) { if (log.isDebugEnabled()) log.debug("Ignoring data load request (node is stopping): " + req); @@ -185,6 +186,31 @@ public class DataStreamProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Processing data load request: " + req); + AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion(); + AffinityTopologyVersion rmtAffVer = req.topologyVersion(); + + if (locAffVer.compareTo(rmtAffVer) < 0) { + if (log.isDebugEnabled()) + log.debug("Received request has higher affinity topology version [request=" + req + + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + + IgniteInternalFuture fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture t) { + ctx.closure().runLocalSafe(new Runnable() { + @Override public void run() { + processRequest(nodeId, req); + } + }, false); + } + }); + + return; + } + } + Object topic; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 002831c..a69e033 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -1173,7 +1173,8 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed dep != null ? dep.userVersion() : null, dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, - dep == null); + dep == null, + ctx.cache().context().exchange().readyAffinityVersion()); try { ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index a216ffe..0d24ee0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.datastreamer; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -77,6 +78,9 @@ public class DataStreamerRequest implements Message { /** */ private boolean forceLocDep; + /** Topology version. */ + private AffinityTopologyVersion topVer; + /** * {@code Externalizable} support. */ @@ -98,6 +102,7 @@ public class DataStreamerRequest implements Message { * @param ldrParticipants Loader participants. * @param clsLdrId Class loader ID. * @param forceLocDep Force local deployment. + * @param topVer Topology version. */ public DataStreamerRequest(long reqId, byte[] resTopicBytes, @@ -111,7 +116,10 @@ public class DataStreamerRequest implements Message { String userVer, Map ldrParticipants, IgniteUuid clsLdrId, - boolean forceLocDep) { + boolean forceLocDep, + @NotNull AffinityTopologyVersion topVer) { + assert topVer != null; + this.reqId = reqId; this.resTopicBytes = resTopicBytes; this.cacheName = cacheName; @@ -125,6 +133,7 @@ public class DataStreamerRequest implements Message { this.ldrParticipants = ldrParticipants; this.clsLdrId = clsLdrId; this.forceLocDep = forceLocDep; + this.topVer = topVer; } /** @@ -218,6 +227,13 @@ public class DataStreamerRequest implements Message { return forceLocDep; } + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStreamerRequest.class, this); @@ -302,12 +318,18 @@ public class DataStreamerRequest implements Message { writer.incrementState(); case 11: - if (!writer.writeByteArray("updaterBytes", updaterBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 12: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 13: if (!writer.writeString("userVer", userVer)) return false; @@ -419,7 +441,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 11: - updaterBytes = reader.readByteArray("updaterBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -427,6 +449,14 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 12: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: userVer = reader.readString("userVer"); if (!reader.isLastRead()) @@ -446,6 +476,6 @@ public class DataStreamerRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java new file mode 100644 index 0000000..5eedd8d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastreamer; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests for DataStreamer. + */ +public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testStartStopIgnites() throws Exception { + for (int attempt = 0; attempt < 3; ++attempt) { + final Ignite ignite = startGrid(0); + + Set futs = new HashSet<>(); + + try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null)) { + dataLdr.maxRemapCount(0); + + final AtomicInteger igniteId = new AtomicInteger(1); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Object call() throws Exception { + for (int i = 1; i < 5; ++i) + startGrid(igniteId.incrementAndGet()); + + return true; + } + }, 2, "startedGridThread"); + + Random random = new Random(); + + while (!fut.isDone()) + futs.add(dataLdr.addData(random.nextInt(100000), random.nextInt(100000))); + } + + for (IgniteFuture f : futs) + f.get(); + + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f13b63d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 49fbcbb..9a4451c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -114,6 +114,7 @@ public class IgniteCacheTestSuite extends TestSuite { suite.addTestSuite(GridCacheAffinityApiSelfTest.class); suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class); suite.addTestSuite(DataStreamProcessorSelfTest.class); + suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class); suite.addTestSuite(DataStreamerImplSelfTest.class); suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class); suite.addTestSuite(GridCacheClearAllSelfTest.class);