Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 828B317D3A for ; Thu, 9 Apr 2015 09:21:13 +0000 (UTC) Received: (qmail 20854 invoked by uid 500); 9 Apr 2015 09:20:51 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 20789 invoked by uid 500); 9 Apr 2015 09:20:51 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 20690 invoked by uid 99); 9 Apr 2015 09:20:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 09:20:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 Apr 2015 09:20:48 +0000 Received: (qmail 19914 invoked by uid 99); 9 Apr 2015 09:20:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Apr 2015 09:20:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B45D0DFF79; Thu, 9 Apr 2015 09:20:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 09 Apr 2015 09:20:28 -0000 Message-Id: <9eb5781e70de430bb06034158cc7bf03@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/12] incubator-ignite git commit: # ignite-694 X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-3 5fcb07292 -> 8803e1798 # ignite-694 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/db754aba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/db754aba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/db754aba Branch: refs/heads/ignite-sprint-3 Commit: db754aba1fcf9d762dba9b96e5333afde7efa0dd Parents: 6553549 Author: sboikov Authored: Tue Apr 7 17:17:08 2015 +0300 Committer: sboikov Committed: Tue Apr 7 17:17:08 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 77 ++++++------ .../GridCachePartitionExchangeManager.java | 121 ++++++++++++++++++- .../IgniteCacheAtomicNodeJoinTest.java | 44 +++++++ .../IgniteCacheNodeJoinAbstractTest.java | 109 +++++++++++++++++ .../distributed/IgniteCacheTxNodeJoinTest.java | 38 ++++++ .../IgniteCacheFailoverTestSuite.java | 4 + 6 files changed, 356 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 7bd6726..8805853 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -76,7 +77,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** Message listener. */ private GridMessageListener lsnr = new GridMessageListener() { - @SuppressWarnings("unchecked") @Override public void onMessage(final UUID nodeId, Object msg) { if (log.isDebugEnabled()) log.debug("Received unordered cache communication message [nodeId=" + nodeId + @@ -84,44 +84,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { final GridCacheMessage cacheMsg = (GridCacheMessage)msg; - int msgIdx = cacheMsg.lookupIndex(); + AffinityTopologyVersion locAffVer = cctx.exchange().topologyVersion(); + AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); - IgniteBiInClosure c = null; - - if (msgIdx >= 0) { - IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId()); - - if (cacheClsHandlers != null) - c = cacheClsHandlers[msgIdx]; - } - - if (c == null) - c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); - - if (c == null) { - if (log.isDebugEnabled()) - log.debug("Received message without registered handler (will ignore) [msg=" + msg + - ", nodeId=" + nodeId + ']'); - - return; - } - - long locTopVer = cctx.discovery().topologyVersion(); - long rmtTopVer = cacheMsg.topologyVersion().topologyVersion(); - - if (locTopVer < rmtTopVer) { + if (locAffVer.compareTo(rmtAffVer) < 0) { if (log.isDebugEnabled()) log.debug("Received message has higher topology version [msg=" + msg + - ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']'); - - IgniteInternalFuture topFut = cctx.discovery().topologyFuture(rmtTopVer); + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); - if (!topFut.isDone()) { - final IgniteBiInClosure c0 = c; + IgniteInternalFuture topFut = cctx.exchange().affinityReadyFuture(rmtAffVer); - topFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture t) { - onMessage0(nodeId, cacheMsg, c0); + if (topFut != null && !topFut.isDone()) { + topFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture t) { + handleMessage(nodeId, cacheMsg); } }); @@ -129,10 +105,41 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } - onMessage0(nodeId, cacheMsg, c); + handleMessage(nodeId, cacheMsg); } }; + /** + * @param nodeId Sender node ID. + * @param cacheMsg Message. + */ + @SuppressWarnings("unchecked") + private void handleMessage(UUID nodeId, GridCacheMessage cacheMsg) { + int msgIdx = cacheMsg.lookupIndex(); + + IgniteBiInClosure c = null; + + if (msgIdx >= 0) { + IgniteBiInClosure[] cacheClsHandlers = idxClsHandlers.get(cacheMsg.cacheId()); + + if (cacheClsHandlers != null) + c = cacheClsHandlers[msgIdx]; + } + + if (c == null) + c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); + + if (c == null) { + if (log.isDebugEnabled()) + log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg + + ", nodeId=" + nodeId + ']'); + + return; + } + + onMessage0(nodeId, cacheMsg, c); + } + /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { retryDelay = cctx.gridConfig().getNetworkSendRetryDelay(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d2d3531..e8e3ea1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -90,6 +90,14 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** */ private volatile GridDhtPartitionsExchangeFuture lastInitializedFut; + /** */ + private final ConcurrentMap readyFuts = new ConcurrentHashMap8<>(); + + /** */ + private final AtomicReference readyTopVer = + new AtomicReference<>(AffinityTopologyVersion.NONE); + + /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -311,6 +319,9 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + for (AffinityReadyFuture f : readyFuts.values()) + f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + U.cancel(exchWorker); if (log.isDebugEnabled()) @@ -372,7 +383,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana * @return Topology version. */ public AffinityTopologyVersion topologyVersion() { - return lastInitializedFut.exchangeId().topologyVersion(); + GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; + + return lastInitializedFut0 != null + ? lastInitializedFut0.exchangeId().topologyVersion() : AffinityTopologyVersion.NONE; } /** @@ -383,6 +397,49 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** + * @param ver Topology version. + * @return Future or {@code null} is future is already completed. + */ + @Nullable IgniteInternalFuture affinityReadyFuture(AffinityTopologyVersion ver) { + GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; + + if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) { + if (log.isDebugEnabled()) + log.debug("Return lastInitializedFut for topology ready future " + + "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']'); + + return lastInitializedFut0; + } + + AffinityTopologyVersion topVer = readyTopVer.get(); + + if (topVer.compareTo(ver) >= 0) { + if (log.isDebugEnabled()) + log.debug("Return finished future for topology ready future [ver=" + ver + ", topVer=" + topVer + ']'); + + return null; + } + + GridFutureAdapter fut = F.addIfAbsent(readyFuts, ver, + new AffinityReadyFuture(ver)); + + if (log.isDebugEnabled()) + log.debug("Created topology ready future [ver=" + ver + ", fut=" + fut + ']'); + + topVer = readyTopVer.get(); + + if (topVer.compareTo(ver) >= 0) { + if (log.isDebugEnabled()) + log.debug("Completing created topology ready future " + + "[ver=" + topVer + ", topVer=" + topVer + ", fut=" + fut + ']'); + + fut.onDone(topVer); + } + + return fut; + } + + /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -642,8 +699,38 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana /** * @param exchFut Exchange. + * @param err Error. */ - public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, Throwable err) { + public void onExchangeDone(GridDhtPartitionsExchangeFuture exchFut, @Nullable Throwable err) { + if (err == null) { + AffinityTopologyVersion topVer = exchFut.topologyVersion(); + + if (log.isDebugEnabled()) + log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ']'); + + while (true) { + AffinityTopologyVersion readyVer = readyTopVer.get(); + + if (readyVer.compareTo(topVer) >= 0) + break; + + if (readyTopVer.compareAndSet(readyVer, topVer)) + break; + } + + for (Map.Entry entry : readyFuts.entrySet()) { + if (entry.getKey().compareTo(topVer) <= 0) { + if (log.isDebugEnabled()) + log.debug("Completing created topology ready future " + + "[ver=" + topVer + ", fut=" + entry.getValue() + ']'); + + entry.getValue().onDone(topVer); + } + } + } + else if (log.isDebugEnabled()) + log.debug("Exchange done with error [fut=" + exchFut + ", err=" + err + ']'); + ExchangeFutureSet exchFuts0 = exchFuts; if (exchFuts0 != null) { @@ -1130,4 +1217,34 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana */ protected abstract void onMessage(ClusterNode node, M msg); } + + /** + * Affinity ready future. + */ + private class AffinityReadyFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private AffinityTopologyVersion topVer; + + /** + * @param topVer Topology version. + */ + private AffinityReadyFuture(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(AffinityTopologyVersion res, @Nullable Throwable err) { + assert res != null || err != null; + + boolean done = super.onDone(res, err); + + if (done) + readyFuts.remove(topVer, this); + + return done; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java new file mode 100644 index 0000000..c0818bd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicNodeJoinTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * + */ +public class IgniteCacheAtomicNodeJoinTest extends IgniteCacheNodeJoinAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { + return PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java new file mode 100644 index 0000000..bed83ec --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.testframework.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public abstract class IgniteCacheNodeJoinAbstractTest extends IgniteCacheAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setReadFromBackup(false); // Force remote 'get'. + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + final IgniteCache cache = jcache(0); + + final int KEYS = 1000; + + Map map = new HashMap<>(); + + for (int i = 0; i < KEYS; i++) + map.put(i, i); + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + cache.putAll(map); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() throws Exception { + startGrid(1); + + return null; + } + }); + + final AtomicBoolean stop = new AtomicBoolean(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + while (!stop.get() && !fut.isDone()) { + for (int key = 0; key < KEYS; key++) { + assertNotNull(cache.get(key)); + + if (key % 100 == 0 && fut.isDone()) + break; + } + } + + return null; + } + }, 10, "test-get"); + + try { + fut.get(60_000); + } + finally { + stop.set(true); + } + + stopGrid(1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java new file mode 100644 index 0000000..d250cd6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheTxNodeJoinTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; + +/** + * + */ +public class IgniteCacheTxNodeJoinTest extends IgniteCacheNodeJoinAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/db754aba/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index 9556235..da79c55 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -19,6 +19,7 @@ package org.apache.ignite.testsuites; import junit.framework.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; @@ -56,6 +57,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { //suite.addTestSuite(GridCacheColocatedFailoverSelfTest.class); TODO IGNITE-631. //suite.addTestSuite(GridCacheReplicatedFailoverSelfTest.class); TODO IGNITE-631. + suite.addTestSuite(IgniteCacheAtomicNodeJoinTest.class); + suite.addTestSuite(IgniteCacheTxNodeJoinTest.class); + return suite; } }